Skip to content

Commit

Permalink
chore: remove deprecated functions, cleanup typehints
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Aug 20, 2024
1 parent bc783cf commit 1f45881
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 129 deletions.
2 changes: 1 addition & 1 deletion docs/usage/optimize/small-file-compaction-with-optimize.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ Let’s run the optimize command to compact the existing small files into larger
```python
dt = DeltaTable("observation_data")

dt.optimize()
dt.optimize.compact()
```

Here’s the output of the command:
Expand Down
13 changes: 8 additions & 5 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ class RawDeltaTable:
def protocol_versions(self) -> List[Any]: ...
def load_version(self, version: int) -> None: ...
def load_with_datetime(self, ds: str) -> None: ...
def files(self, partition_filters: Optional[FilterType]) -> List[str]: ...
def file_uris(self, partition_filters: Optional[FilterType]) -> List[str]: ...
def files(self, partition_filters: Optional[PartitionFilterType]) -> List[str]: ...
def file_uris(
self, partition_filters: Optional[PartitionFilterType]
) -> List[str]: ...
def vacuum(
self,
dry_run: bool,
Expand All @@ -60,7 +62,7 @@ class RawDeltaTable:
) -> List[str]: ...
def compact_optimize(
self,
partition_filters: Optional[FilterType],
partition_filters: Optional[PartitionFilterType],
target_size: Optional[int],
max_concurrent_tasks: Optional[int],
min_commit_interval: Optional[int],
Expand All @@ -71,7 +73,7 @@ class RawDeltaTable:
def z_order_optimize(
self,
z_order_columns: List[str],
partition_filters: Optional[FilterType],
partition_filters: Optional[PartitionFilterType],
target_size: Optional[int],
max_concurrent_tasks: Optional[int],
max_spill_size: Optional[int],
Expand Down Expand Up @@ -115,7 +117,7 @@ class RawDeltaTable:
def history(self, limit: Optional[int]) -> List[str]: ...
def update_incremental(self) -> None: ...
def dataset_partitions(
self, schema: pyarrow.Schema, partition_filters: Optional[FilterType]
self, schema: pyarrow.Schema, partition_filters: Optional[FilterConjunctionType]
) -> List[Any]: ...
def create_checkpoint(self) -> None: ...
def get_add_actions(self, flatten: bool) -> pyarrow.RecordBatch: ...
Expand Down Expand Up @@ -848,3 +850,4 @@ FilterLiteralType = Tuple[str, str, Any]
FilterConjunctionType = List[FilterLiteralType]
FilterDNFType = List[FilterConjunctionType]
FilterType = Union[FilterConjunctionType, FilterDNFType]
PartitionFilterType = List[Tuple[str, str, Union[str, List[str]]]]
145 changes: 22 additions & 123 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
if TYPE_CHECKING:
import os

from deltalake._internal import RawDeltaTable
from deltalake._internal import (
RawDeltaTable,
)
from deltalake._internal import create_deltalake as _create_deltalake
from deltalake._util import encode_partition_value
from deltalake.data_catalog import DataCatalog
Expand All @@ -63,6 +65,12 @@
NOT_SUPPORTED_READER_VERSION = 2
SUPPORTED_READER_FEATURES = {"timestampNtz"}

FilterLiteralType = Tuple[str, str, Any]
FilterConjunctionType = List[FilterLiteralType]
FilterDNFType = List[FilterConjunctionType]
FilterType = Union[FilterConjunctionType, FilterDNFType]
PartitionFilterType = List[Tuple[str, str, Union[str, List[str]]]]


class Compression(Enum):
UNCOMPRESSED = "UNCOMPRESSED"
Expand Down Expand Up @@ -336,15 +344,6 @@ class ProtocolVersions(NamedTuple):
reader_features: Optional[List[str]]


FilterLiteralType = Tuple[str, str, Any]

FilterConjunctionType = List[FilterLiteralType]

FilterDNFType = List[FilterConjunctionType]

FilterType = Union[FilterConjunctionType, FilterDNFType]


@dataclass(init=False)
class DeltaTable:
"""Represents a Delta Table"""
Expand Down Expand Up @@ -544,10 +543,10 @@ def files(
("z", "not in", ["a","b"])
```
"""
return self._table.files(self.__stringify_partition_values(partition_filters))
return self._table.files(self._stringify_partition_values(partition_filters))

def file_uris(
self, partition_filters: Optional[List[Tuple[str, str, Any]]] = None
self, partition_filters: Optional[FilterConjunctionType] = None
) -> List[str]:
"""
Get the list of files as absolute URIs, including the scheme (e.g. "s3://").
Expand Down Expand Up @@ -582,7 +581,7 @@ def file_uris(
```
"""
return self._table.file_uris(
self.__stringify_partition_values(partition_filters)
self._stringify_partition_values(partition_filters)
)

file_uris.__doc__ = ""
Expand Down Expand Up @@ -629,48 +628,6 @@ def load_as_version(self, version: Union[int, str, datetime]) -> None:
"Invalid datatype provided for version, only int, str or datetime are accepted."
)

def load_version(self, version: int) -> None:
"""
Load a DeltaTable with a specified version.
!!! warning "Deprecated"
Load_version and load_with_datetime have been combined into `DeltaTable.load_as_version`.
Args:
version: the identifier of the version of the DeltaTable to load
"""
warnings.warn(
"Call to deprecated method DeltaTable.load_version. Use DeltaTable.load_as_version() instead.",
category=DeprecationWarning,
stacklevel=2,
)
self._table.load_version(version)

def load_with_datetime(self, datetime_string: str) -> None:
"""
Time travel Delta table to the latest version that's created at or before provided `datetime_string` argument.
The `datetime_string` argument should be an RFC 3339 and ISO 8601 date and time string.
!!! warning "Deprecated"
Load_version and load_with_datetime have been combined into `DeltaTable.load_as_version`.
Args:
datetime_string: the identifier of the datetime point of the DeltaTable to load
Example:
```
"2018-01-26T18:30:09Z"
"2018-12-19T16:39:57-08:00"
"2018-01-26T18:30:09.453+00:00"
```
"""
warnings.warn(
"Call to deprecated method DeltaTable.load_with_datetime. Use DeltaTable.load_as_version() instead.",
category=DeprecationWarning,
stacklevel=2,
)
self._table.load_with_datetime(datetime_string)

def load_cdf(
self,
starting_version: int = 0,
Expand Down Expand Up @@ -700,7 +657,7 @@ def schema(self) -> DeltaSchema:
"""
return self._table.schema

def files_by_partitions(self, partition_filters: FilterType) -> List[str]:
def files_by_partitions(self, partition_filters: PartitionFilterType) -> List[str]:
"""
Get the files for each partition
Expand All @@ -711,7 +668,7 @@ def files_by_partitions(self, partition_filters: FilterType) -> List[str]:
stacklevel=2,
)

return self.files(partition_filters) # type: ignore
return self.files(partition_filters)

def metadata(self) -> Metadata:
"""
Expand Down Expand Up @@ -1045,7 +1002,7 @@ def restore(

def to_pyarrow_dataset(
self,
partitions: Optional[List[Tuple[str, str, Any]]] = None,
partitions: Optional[FilterConjunctionType] = None,
filesystem: Optional[Union[str, pa_fs.FileSystem]] = None,
parquet_read_options: Optional[ParquetReadOptions] = None,
schema: Optional[pyarrow.Schema] = None,
Expand Down Expand Up @@ -1206,9 +1163,9 @@ def cleanup_metadata(self) -> None:
"""
self._table.cleanup_metadata()

def __stringify_partition_values(
self, partition_filters: Optional[List[Tuple[str, str, Any]]]
) -> Optional[List[Tuple[str, str, Union[str, List[str]]]]]:
def _stringify_partition_values(
self, partition_filters: Optional[FilterConjunctionType]
) -> Optional[PartitionFilterType]:
if partition_filters is None:
return partition_filters
out = []
Expand Down Expand Up @@ -1370,45 +1327,6 @@ def __init__(
self.not_matched_by_source_delete_predicate: Optional[List[str]] = None
self.not_matched_by_source_delete_all: Optional[bool] = None

def with_writer_properties(
self,
data_page_size_limit: Optional[int] = None,
dictionary_page_size_limit: Optional[int] = None,
data_page_row_count_limit: Optional[int] = None,
write_batch_size: Optional[int] = None,
max_row_group_size: Optional[int] = None,
) -> "TableMerger":
"""
!!! warning "Deprecated"
Use `.merge(writer_properties = WriterProperties())` instead
Pass writer properties to the Rust parquet writer, see options https://arrow.apache.org/rust/parquet/file/properties/struct.WriterProperties.html:
Args:
data_page_size_limit: Limit DataPage size to this in bytes.
dictionary_page_size_limit: Limit the size of each DataPage to store dicts to this amount in bytes.
data_page_row_count_limit: Limit the number of rows in each DataPage.
write_batch_size: Splits internally to smaller batch size.
max_row_group_size: Max number of rows in row group.
Returns:
TableMerger: TableMerger Object
"""
warnings.warn(
"Call to deprecated method TableMerger.with_writer_properties. Use DeltaTable.merge(writer_properties=WriterProperties()) instead.",
category=DeprecationWarning,
stacklevel=2,
)

writer_properties: Dict[str, Any] = {
"data_page_size_limit": data_page_size_limit,
"dictionary_page_size_limit": dictionary_page_size_limit,
"data_page_row_count_limit": data_page_row_count_limit,
"write_batch_size": write_batch_size,
"max_row_group_size": max_row_group_size,
}
self.writer_properties = WriterProperties(**writer_properties)
return self

def when_matched_update(
self, updates: Dict[str, str], predicate: Optional[str] = None
) -> "TableMerger":
Expand Down Expand Up @@ -2003,28 +1921,9 @@ class TableOptimizer:
def __init__(self, table: DeltaTable):
self.table = table

def __call__(
self,
partition_filters: Optional[FilterType] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
) -> Dict[str, Any]:
"""
!!! warning "DEPRECATED 0.10.0"
Use [compact][deltalake.table.DeltaTable.compact] instead, which has the same signature.
"""

warnings.warn(
"Call to deprecated method DeltaTable.optimize. Use DeltaTable.optimize.compact() instead.",
category=DeprecationWarning,
stacklevel=2,
)

return self.compact(partition_filters, target_size, max_concurrent_tasks)

def compact(
self,
partition_filters: Optional[FilterType] = None,
partition_filters: Optional[FilterConjunctionType] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
min_commit_interval: Optional[Union[int, timedelta]] = None,
Expand Down Expand Up @@ -2079,7 +1978,7 @@ def compact(
min_commit_interval = int(min_commit_interval.total_seconds())

metrics = self.table._table.compact_optimize(
partition_filters,
self.table._stringify_partition_values(partition_filters),
target_size,
max_concurrent_tasks,
min_commit_interval,
Expand All @@ -2093,7 +1992,7 @@ def compact(
def z_order(
self,
columns: Iterable[str],
partition_filters: Optional[FilterType] = None,
partition_filters: Optional[FilterConjunctionType] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
max_spill_size: int = 20 * 1024 * 1024 * 1024,
Expand Down Expand Up @@ -2148,7 +2047,7 @@ def z_order(

metrics = self.table._table.z_order_optimize(
list(columns),
partition_filters,
self.table._stringify_partition_values(partition_filters),
target_size,
max_concurrent_tasks,
max_spill_size,
Expand Down

0 comments on commit 1f45881

Please sign in to comment.