Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(python): remove deprecated or duplicated functions #2801

Merged
merged 3 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ Let’s run the optimize command to compact the existing small files into larger
```python
dt = DeltaTable("observation_data")

dt.optimize()
dt.optimize.compact()
```

Here’s the output of the command:
Expand Down
14 changes: 7 additions & 7 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,10 @@ class RawDeltaTable:
def protocol_versions(self) -> List[Any]: ...
def load_version(self, version: int) -> None: ...
def load_with_datetime(self, ds: str) -> None: ...
def files_by_partitions(
self, partitions_filters: Optional[FilterType]
def files(self, partition_filters: Optional[PartitionFilterType]) -> List[str]: ...
def file_uris(
self, partition_filters: Optional[PartitionFilterType]
) -> List[str]: ...
def files(self, partition_filters: Optional[FilterType]) -> List[str]: ...
def file_uris(self, partition_filters: Optional[FilterType]) -> List[str]: ...
def vacuum(
self,
dry_run: bool,
Expand All @@ -63,7 +62,7 @@ class RawDeltaTable:
) -> List[str]: ...
def compact_optimize(
self,
partition_filters: Optional[FilterType],
partition_filters: Optional[PartitionFilterType],
target_size: Optional[int],
max_concurrent_tasks: Optional[int],
min_commit_interval: Optional[int],
Expand All @@ -74,7 +73,7 @@ class RawDeltaTable:
def z_order_optimize(
self,
z_order_columns: List[str],
partition_filters: Optional[FilterType],
partition_filters: Optional[PartitionFilterType],
target_size: Optional[int],
max_concurrent_tasks: Optional[int],
max_spill_size: Optional[int],
Expand Down Expand Up @@ -118,7 +117,7 @@ class RawDeltaTable:
def history(self, limit: Optional[int]) -> List[str]: ...
def update_incremental(self) -> None: ...
def dataset_partitions(
self, schema: pyarrow.Schema, partition_filters: Optional[FilterType]
self, schema: pyarrow.Schema, partition_filters: Optional[FilterConjunctionType]
) -> List[Any]: ...
def create_checkpoint(self) -> None: ...
def get_add_actions(self, flatten: bool) -> pyarrow.RecordBatch: ...
Expand Down Expand Up @@ -851,3 +850,4 @@ FilterLiteralType = Tuple[str, str, Any]
FilterConjunctionType = List[FilterLiteralType]
FilterDNFType = List[FilterConjunctionType]
FilterType = Union[FilterConjunctionType, FilterDNFType]
PartitionFilterType = List[Tuple[str, str, Union[str, List[str]]]]
151 changes: 28 additions & 123 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
if TYPE_CHECKING:
import os

from deltalake._internal import RawDeltaTable
from deltalake._internal import (
RawDeltaTable,
)
from deltalake._internal import create_deltalake as _create_deltalake
from deltalake._util import encode_partition_value
from deltalake.data_catalog import DataCatalog
Expand All @@ -63,6 +65,12 @@
NOT_SUPPORTED_READER_VERSION = 2
SUPPORTED_READER_FEATURES = {"timestampNtz"}

FilterLiteralType = Tuple[str, str, Any]
FilterConjunctionType = List[FilterLiteralType]
FilterDNFType = List[FilterConjunctionType]
FilterType = Union[FilterConjunctionType, FilterDNFType]
PartitionFilterType = List[Tuple[str, str, Union[str, List[str]]]]


class Compression(Enum):
UNCOMPRESSED = "UNCOMPRESSED"
Expand Down Expand Up @@ -336,15 +344,6 @@ class ProtocolVersions(NamedTuple):
reader_features: Optional[List[str]]


FilterLiteralType = Tuple[str, str, Any]

FilterConjunctionType = List[FilterLiteralType]

FilterDNFType = List[FilterConjunctionType]

FilterType = Union[FilterConjunctionType, FilterDNFType]


@dataclass(init=False)
class DeltaTable:
"""Represents a Delta Table"""
Expand Down Expand Up @@ -544,10 +543,10 @@ def files(
("z", "not in", ["a","b"])
```
"""
return self._table.files(self.__stringify_partition_values(partition_filters))
return self._table.files(self._stringify_partition_values(partition_filters))

def file_uris(
self, partition_filters: Optional[List[Tuple[str, str, Any]]] = None
self, partition_filters: Optional[FilterConjunctionType] = None
) -> List[str]:
"""
Get the list of files as absolute URIs, including the scheme (e.g. "s3://").
Expand Down Expand Up @@ -582,7 +581,7 @@ def file_uris(
```
"""
return self._table.file_uris(
self.__stringify_partition_values(partition_filters)
self._stringify_partition_values(partition_filters)
)

file_uris.__doc__ = ""
Expand Down Expand Up @@ -629,48 +628,6 @@ def load_as_version(self, version: Union[int, str, datetime]) -> None:
"Invalid datatype provided for version, only int, str or datetime are accepted."
)

def load_version(self, version: int) -> None:
"""
Load a DeltaTable with a specified version.

!!! warning "Deprecated"
Load_version and load_with_datetime have been combined into `DeltaTable.load_as_version`.

Args:
version: the identifier of the version of the DeltaTable to load
"""
warnings.warn(
"Call to deprecated method DeltaTable.load_version. Use DeltaTable.load_as_version() instead.",
category=DeprecationWarning,
stacklevel=2,
)
self._table.load_version(version)

def load_with_datetime(self, datetime_string: str) -> None:
"""
Time travel Delta table to the latest version that's created at or before provided `datetime_string` argument.
The `datetime_string` argument should be an RFC 3339 and ISO 8601 date and time string.

!!! warning "Deprecated"
Load_version and load_with_datetime have been combined into `DeltaTable.load_as_version`.

Args:
datetime_string: the identifier of the datetime point of the DeltaTable to load

Example:
```
"2018-01-26T18:30:09Z"
"2018-12-19T16:39:57-08:00"
"2018-01-26T18:30:09.453+00:00"
```
"""
warnings.warn(
"Call to deprecated method DeltaTable.load_with_datetime. Use DeltaTable.load_as_version() instead.",
category=DeprecationWarning,
stacklevel=2,
)
self._table.load_with_datetime(datetime_string)

def load_cdf(
self,
starting_version: int = 0,
Expand Down Expand Up @@ -700,12 +657,18 @@ def schema(self) -> DeltaSchema:
"""
return self._table.schema

def files_by_partitions(self, partition_filters: Optional[FilterType]) -> List[str]:
def files_by_partitions(self, partition_filters: PartitionFilterType) -> List[str]:
"""
Get the files for each partition

"""
return self._table.files_by_partitions(partition_filters)
warnings.warn(
"files_by_partitions is deprecated, please use DeltaTable.files() instead.",
category=DeprecationWarning,
stacklevel=2,
)

return self.files(partition_filters)

def metadata(self) -> Metadata:
"""
Expand Down Expand Up @@ -1039,7 +1002,7 @@ def restore(

def to_pyarrow_dataset(
self,
partitions: Optional[List[Tuple[str, str, Any]]] = None,
partitions: Optional[FilterConjunctionType] = None,
filesystem: Optional[Union[str, pa_fs.FileSystem]] = None,
parquet_read_options: Optional[ParquetReadOptions] = None,
schema: Optional[pyarrow.Schema] = None,
Expand Down Expand Up @@ -1200,9 +1163,9 @@ def cleanup_metadata(self) -> None:
"""
self._table.cleanup_metadata()

def __stringify_partition_values(
self, partition_filters: Optional[List[Tuple[str, str, Any]]]
) -> Optional[List[Tuple[str, str, Union[str, List[str]]]]]:
def _stringify_partition_values(
self, partition_filters: Optional[FilterConjunctionType]
) -> Optional[PartitionFilterType]:
if partition_filters is None:
return partition_filters
out = []
Expand Down Expand Up @@ -1364,45 +1327,6 @@ def __init__(
self.not_matched_by_source_delete_predicate: Optional[List[str]] = None
self.not_matched_by_source_delete_all: Optional[bool] = None

def with_writer_properties(
self,
data_page_size_limit: Optional[int] = None,
dictionary_page_size_limit: Optional[int] = None,
data_page_row_count_limit: Optional[int] = None,
write_batch_size: Optional[int] = None,
max_row_group_size: Optional[int] = None,
) -> "TableMerger":
"""
!!! warning "Deprecated"
Use `.merge(writer_properties = WriterProperties())` instead
Pass writer properties to the Rust parquet writer, see options https://arrow.apache.org/rust/parquet/file/properties/struct.WriterProperties.html:

Args:
data_page_size_limit: Limit DataPage size to this in bytes.
dictionary_page_size_limit: Limit the size of each DataPage to store dicts to this amount in bytes.
data_page_row_count_limit: Limit the number of rows in each DataPage.
write_batch_size: Splits internally to smaller batch size.
max_row_group_size: Max number of rows in row group.

Returns:
TableMerger: TableMerger Object
"""
warnings.warn(
"Call to deprecated method TableMerger.with_writer_properties. Use DeltaTable.merge(writer_properties=WriterProperties()) instead.",
category=DeprecationWarning,
stacklevel=2,
)

writer_properties: Dict[str, Any] = {
"data_page_size_limit": data_page_size_limit,
"dictionary_page_size_limit": dictionary_page_size_limit,
"data_page_row_count_limit": data_page_row_count_limit,
"write_batch_size": write_batch_size,
"max_row_group_size": max_row_group_size,
}
self.writer_properties = WriterProperties(**writer_properties)
return self

def when_matched_update(
self, updates: Dict[str, str], predicate: Optional[str] = None
) -> "TableMerger":
Expand Down Expand Up @@ -1997,28 +1921,9 @@ class TableOptimizer:
def __init__(self, table: DeltaTable):
self.table = table

def __call__(
self,
partition_filters: Optional[FilterType] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
) -> Dict[str, Any]:
"""
!!! warning "DEPRECATED 0.10.0"
Use [compact][deltalake.table.DeltaTable.compact] instead, which has the same signature.
"""

warnings.warn(
"Call to deprecated method DeltaTable.optimize. Use DeltaTable.optimize.compact() instead.",
category=DeprecationWarning,
stacklevel=2,
)

return self.compact(partition_filters, target_size, max_concurrent_tasks)

def compact(
self,
partition_filters: Optional[FilterType] = None,
partition_filters: Optional[FilterConjunctionType] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
min_commit_interval: Optional[Union[int, timedelta]] = None,
Expand Down Expand Up @@ -2073,7 +1978,7 @@ def compact(
min_commit_interval = int(min_commit_interval.total_seconds())

metrics = self.table._table.compact_optimize(
partition_filters,
self.table._stringify_partition_values(partition_filters),
target_size,
max_concurrent_tasks,
min_commit_interval,
Expand All @@ -2087,7 +1992,7 @@ def compact(
def z_order(
self,
columns: Iterable[str],
partition_filters: Optional[FilterType] = None,
partition_filters: Optional[FilterConjunctionType] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
max_spill_size: int = 20 * 1024 * 1024 * 1024,
Expand Down Expand Up @@ -2142,7 +2047,7 @@ def z_order(

metrics = self.table._table.z_order_optimize(
list(columns),
partition_filters,
self.table._stringify_partition_values(partition_filters),
target_size,
max_concurrent_tasks,
max_spill_size,
Expand Down
22 changes: 1 addition & 21 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,26 +270,6 @@ impl RawDeltaTable {
})
}

pub fn files_by_partitions(
&self,
py: Python,
partitions_filters: Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>,
) -> PyResult<Vec<String>> {
py.allow_threads(|| {
let partition_filters = convert_partition_filters(partitions_filters);
match partition_filters {
Ok(filters) => Ok(self
._table
.get_files_by_partitions(&filters)
.map_err(PythonError::from)?
.into_iter()
.map(|p| p.to_string())
.collect()),
Err(err) => Err(PythonError::from(err).into()),
}
})
}

pub fn files(
&self,
py: Python,
Expand Down Expand Up @@ -961,7 +941,7 @@ impl RawDeltaTable {
) -> PyResult<Vec<(String, Option<Bound<'py, PyAny>>)>> {
let path_set = match partition_filters {
Some(filters) => Some(HashSet::<_>::from_iter(
self.files_by_partitions(py, filters)?.iter().cloned(),
self.files(py, Some(filters))?.iter().cloned(),
)),
None => None,
};
Expand Down
Loading