Skip to content

Commit

Permalink
feat(memtable): add pyarrow.dataset support (#10206)
Browse files Browse the repository at this point in the history
Co-authored-by: Phillip Cloud <[email protected]>
  • Loading branch information
gforsyth and cpcloud authored Sep 26, 2024
1 parent 021df72 commit 428d1a3
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 8 deletions.
10 changes: 9 additions & 1 deletion ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1609,7 +1609,15 @@ def _in_memory_table_exists(self, name: str) -> bool:
return True

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
self.con.register(op.name, op.data.to_pyarrow(op.schema))
data = op.data
schema = op.schema

try:
obj = data.to_pyarrow_dataset(schema)
except AttributeError:
obj = data.to_pyarrow(schema)

self.con.register(op.name, obj)

def _finalize_memtable(self, name: str) -> None:
# if we don't aggressively unregister tables duckdb will keep a
Expand Down
7 changes: 4 additions & 3 deletions ibis/backends/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -933,21 +933,22 @@ def test_self_join_memory_table(backend, con, monkeypatch):
[
"bigquery",
"clickhouse",
"duckdb",
"exasol",
"impala",
"mssql",
"mysql",
"oracle",
"polars",
"postgres",
"pyspark",
"risingwave",
"snowflake",
"sqlite",
"trino",
]
],
raises=com.UnsupportedOperationError,
reason="we don't materialize datasets to avoid perf footguns",
),
pytest.mark.notimpl(["polars"], raises=NotImplementedError),
],
id="pyarrow dataset",
),
Expand Down
18 changes: 18 additions & 0 deletions ibis/expr/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import pandas as pd
import polars as pl
import pyarrow as pa
import pyarrow.dataset as ds

from ibis.expr.schema import SchemaLike

Expand Down Expand Up @@ -480,6 +481,23 @@ def _memtable_from_pyarrow_table(
).to_expr()


@_memtable.register("pyarrow.dataset.Dataset")
def _memtable_from_pyarrow_dataset(
data: ds.Dataset,
*,
name: str | None = None,
schema: SchemaLike | None = None,
columns: Iterable[str] | None = None,
):
from ibis.formats.pyarrow import PyArrowDatasetProxy

return ops.InMemoryTable(
name=name if name is not None else util.gen_name("pyarrow_memtable"),
schema=Schema.from_pyarrow(data.schema),
data=PyArrowDatasetProxy(data),
).to_expr()


@_memtable.register("polars.LazyFrame")
def _memtable_from_polars_lazyframe(data: pl.LazyFrame, **kwargs):
return _memtable_from_polars_dataframe(data.collect(), **kwargs)
Expand Down
3 changes: 0 additions & 3 deletions ibis/formats/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,6 @@ def __repr__(self) -> str:
data_repr = indent(repr(self.obj), spaces=2)
return f"{self.__class__.__name__}:\n{data_repr}"

def __len__(self) -> int:
return len(self.obj)

@abstractmethod
def to_frame(self) -> pd.DataFrame: # pragma: no cover
"""Convert this input to a pandas DataFrame."""
Expand Down
41 changes: 40 additions & 1 deletion ibis/formats/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
import pyarrow as pa
import pyarrow_hotfix # noqa: F401

import ibis.common.exceptions as com
import ibis.expr.datatypes as dt
from ibis.expr.schema import Schema
from ibis.formats import DataMapper, SchemaMapper, TableProxy, TypeMapper
from ibis.util import V

if TYPE_CHECKING:
from collections.abc import Sequence

import pandas as pd
import polars as pl
import pyarrow.dataset as ds


_from_pyarrow_types = {
Expand Down Expand Up @@ -327,7 +331,7 @@ def convert_table(cls, table: pa.Table, schema: Schema) -> pa.Table:
return table


class PyArrowTableProxy(TableProxy):
class PyArrowTableProxy(TableProxy[V]):
def to_frame(self):
return self.obj.to_pandas()

Expand All @@ -341,3 +345,38 @@ def to_polars(self, schema: Schema) -> pl.DataFrame:

df = pl.from_arrow(self.obj)
return PolarsData.convert_table(df, schema)


class PyArrowDatasetProxy(TableProxy[V]):
ERROR_MESSAGE = """\
You are trying to use a PyArrow Dataset with a backend that will require
materializing the entire dataset in local memory.
If you would like to materialize this dataset, please construct the memtable
directly by running `ibis.memtable(my_dataset.to_table())`."""

__slots__ = ("obj",)
obj: V

def __init__(self, obj: V) -> None:
self.obj = obj

# pyarrow datasets are hashable, so we override the hash from TableProxy
def __hash__(self):
return hash(self.obj)

def to_frame(self) -> pd.DataFrame:
raise com.UnsupportedOperationError(self.ERROR_MESSAGE)

def to_pyarrow(self, schema: Schema) -> pa.Table:
raise com.UnsupportedOperationError(self.ERROR_MESSAGE)

def to_pyarrow_dataset(self, schema: Schema) -> ds.Dataset:
"""Return the dataset object itself.
Use with backends that can perform pushdowns into dataset objects.
"""
return self.obj

def to_polars(self, schema: Schema) -> pa.Table:
raise com.UnsupportedOperationError(self.ERROR_MESSAGE)

0 comments on commit 428d1a3

Please sign in to comment.