Skip to content

Commit

Permalink
feat(delta-io#2597): allow pyarrow.dataset.Expression in filters kwarg
Browse files Browse the repository at this point in the history
  • Loading branch information
giacomo.rebecchi authored and ion-elgreco committed Jun 16, 2024
1 parent df6e6fd commit 1561d63
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 119 deletions.
128 changes: 10 additions & 118 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import json
import operator
import warnings
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from enum import Enum
from functools import reduce
from pathlib import Path
from typing import (
TYPE_CHECKING,
Expand All @@ -19,7 +17,6 @@
Optional,
Tuple,
Union,
cast,
)

import pyarrow
Expand All @@ -34,6 +31,11 @@
ParquetReadOptions,
)

try:
from pyarrow.parquet import filters_to_expression # pyarrow >= 10.0.0
except ImportError:
from pyarrow.parquet import _filters_to_expression as filters_to_expression

if TYPE_CHECKING:
import os

Expand Down Expand Up @@ -260,116 +262,6 @@ class ProtocolVersions(NamedTuple):
FilterType = Union[FilterConjunctionType, FilterDNFType]


def _check_contains_null(value: Any) -> bool:
"""
Check if target contains nullish value.
"""
if isinstance(value, bytes):
for byte in value:
if isinstance(byte, bytes):
compare_to = chr(0)
else:
compare_to = 0
if byte == compare_to:
return True
elif isinstance(value, str):
return "\x00" in value
return False


def _check_dnf(
dnf: FilterDNFType,
check_null_strings: bool = True,
) -> FilterDNFType:
"""
Check if DNF are well-formed.
"""
if len(dnf) == 0 or any(len(c) == 0 for c in dnf):
raise ValueError("Malformed DNF")
if check_null_strings:
for conjunction in dnf:
for col, op, val in conjunction:
if (
isinstance(val, list)
and all(_check_contains_null(v) for v in val)
or _check_contains_null(val)
):
raise NotImplementedError(
"Null-terminated binary strings are not supported "
"as filter values."
)
return dnf


def _convert_single_predicate(column: str, op: str, value: Any) -> Expression:
"""
Convert given `tuple` to [pyarrow.dataset.Expression].
"""
import pyarrow.dataset as ds

field = ds.field(column)
if op == "=" or op == "==":
return field == value
elif op == "!=":
return field != value
elif op == "<":
return field < value
elif op == ">":
return field > value
elif op == "<=":
return field <= value
elif op == ">=":
return field >= value
elif op == "in":
return field.isin(value)
elif op == "not in":
return ~field.isin(value)
else:
raise ValueError(
f'"{(column, op, value)}" is not a valid operator in predicates.'
)


def _filters_to_expression(filters: FilterType) -> Expression:
"""
Check if filters are well-formed and convert to an [pyarrow.dataset.Expression].
"""
if isinstance(filters[0][0], str):
# We have encountered the situation where we have one nesting level too few:
# We have [(,,), ..] instead of [[(,,), ..]]
dnf = cast(FilterDNFType, [filters])
else:
dnf = cast(FilterDNFType, filters)
dnf = _check_dnf(dnf, check_null_strings=False)
disjunction_members = []
for conjunction in dnf:
conjunction_members = [
_convert_single_predicate(col, op, val) for col, op, val in conjunction
]
disjunction_members.append(reduce(operator.and_, conjunction_members))
return reduce(operator.or_, disjunction_members)


_DNF_filter_doc = """
Predicates are expressed in disjunctive normal form (DNF), like [("x", "=", "a"), ...].
DNF allows arbitrary boolean logical combinations of single partition predicates.
The innermost tuples each describe a single partition predicate. The list of inner
predicates is interpreted as a conjunction (AND), forming a more selective and
multiple partition predicates. Each tuple has format: (key, op, value) and compares
the key with the value. The supported op are: `=`, `!=`, `in`, and `not in`. If
the op is in or not in, the value must be a collection such as a list, a set or a tuple.
The supported type for value is str. Use empty string `''` for Null partition value.
Example:
```
("x", "=", "a")
("x", "!=", "a")
("y", "in", ["a", "b", "c"])
("z", "not in", ["a","b"])
```
"""


@dataclass(init=False)
class DeltaTable:
"""Represents a Delta Table"""
Expand Down Expand Up @@ -1145,7 +1037,7 @@ def to_pyarrow_table(
partitions: Optional[List[Tuple[str, str, Any]]] = None,
columns: Optional[List[str]] = None,
filesystem: Optional[Union[str, pa_fs.FileSystem]] = None,
filters: Optional[FilterType] = None,
filters: Optional[Union[FilterType, Expression]] = None,
) -> pyarrow.Table:
"""
Build a PyArrow Table using data from the DeltaTable.
Expand All @@ -1154,10 +1046,10 @@ def to_pyarrow_table(
partitions: A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax
columns: The columns to project. This can be a list of column names to include (order and duplicates will be preserved)
filesystem: A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem
filters: A disjunctive normal form (DNF) predicate for filtering rows. If you pass a filter you do not need to pass ``partitions``
filters: A disjunctive normal form (DNF) predicate for filtering rows, or directly a pyarrow.dataset.Expression. If you pass a filter you do not need to pass ``partitions``
"""
if filters is not None:
filters = _filters_to_expression(filters)
filters = filters_to_expression(filters)
return self.to_pyarrow_dataset(
partitions=partitions, filesystem=filesystem
).to_table(columns=columns, filter=filters)
Expand All @@ -1167,7 +1059,7 @@ def to_pandas(
partitions: Optional[List[Tuple[str, str, Any]]] = None,
columns: Optional[List[str]] = None,
filesystem: Optional[Union[str, pa_fs.FileSystem]] = None,
filters: Optional[FilterType] = None,
filters: Optional[Union[FilterType, Expression]] = None,
) -> "pd.DataFrame":
"""
Build a pandas dataframe using data from the DeltaTable.
Expand All @@ -1176,7 +1068,7 @@ def to_pandas(
partitions: A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax
columns: The columns to project. This can be a list of column names to include (order and duplicates will be preserved)
filesystem: A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem
filters: A disjunctive normal form (DNF) predicate for filtering rows. If you pass a filter you do not need to pass ``partitions``
filters: A disjunctive normal form (DNF) predicate for filtering rows, or directly a pyarrow.dataset.Expression. If you pass a filter you do not need to pass ``partitions``
"""
return self.to_pyarrow_table(
partitions=partitions,
Expand Down
7 changes: 7 additions & 0 deletions python/stubs/pyarrow/parquet.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from typing import Callable

from deltalake.table import FilterType
from pyarrow.dataset import Expression

filters_to_expression: Callable[[FilterType], Expression]
_filters_to_expression: Callable[[FilterType], Expression]
8 changes: 7 additions & 1 deletion python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,11 @@ def test_delta_table_with_filters():

filter_expr = ds.field("date") > "2021-02-20"
data = dataset.to_table(filter=filter_expr)
assert len(dt.to_pandas(filters=[("date", ">", "2021-02-20")])) == data.num_rows
assert (
len(dt.to_pandas(filters=[("date", ">", "2021-02-20")]))
== len(dt.to_pandas(filters=filter_expr))
== data.num_rows
)

filter_expr = (ds.field("date") > "2021-02-20") | (
ds.field("state").isin(["Alabama", "Wyoming"])
Expand All @@ -522,6 +526,7 @@ def test_delta_table_with_filters():
]
)
)
== len(dt.to_pandas(filters=filter_expr))
== data.num_rows
)

Expand All @@ -538,6 +543,7 @@ def test_delta_table_with_filters():
]
)
)
== len(dt.to_pandas(filters=filter_expr))
== data.num_rows
)

Expand Down

0 comments on commit 1561d63

Please sign in to comment.