Skip to content

Commit

Permalink
feat(api): add to_pandas_batches
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Sep 28, 2023
1 parent 1dea9ca commit 7c23aa9
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 1 deletion.
46 changes: 46 additions & 0 deletions ibis/backends/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import ibis.expr.types as ir
from ibis import util
from ibis.common.caching import RefCountedCache
from ibis.formats.pandas import PandasData

if TYPE_CHECKING:
from collections.abc import Iterable, Iterator, Mapping, MutableMapping
Expand Down Expand Up @@ -223,6 +224,8 @@ def _ipython_key_completions_(self) -> list[str]:


class _FileIOHandler:
pandas_converter = PandasData

@staticmethod
def _import_pyarrow():
try:
Expand All @@ -234,6 +237,49 @@ def _import_pyarrow():
else:
return pyarrow

def to_pandas_batches(
self,
expr: ir.Expr,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
chunk_size: int = 1_000_000,
**kwargs: Any,
) -> Iterator[pd.DataFrame]:
"""Execute an Ibis expression and return an iterator of pandas `DataFrame`s.
Parameters
----------
expr
Ibis expression to execute
params
Mapping of scalar parameter expressions to value.
limit
An integer to effect a specific row limit. A value of `None` means
"no limit". The default is in `ibis/config.py`.
chunk_size
Maximum number of rows in each returned record batch. This may have
no effect depending on the backend.
kwargs
Keyword arguments
Returns
-------
Iterator[pd.DataFrame]
An iterator pandas `DataFrame`s
"""
orig_expr = expr
expr = expr.as_table()
schema = expr.schema()
yield from (
orig_expr.__pandas_result__(
self.pandas_converter.convert_table(batch.to_pandas(), schema)
)
for batch in self.to_pyarrow_batches(
expr, params=params, limit=limit, chunk_size=chunk_size, **kwargs
)
)

@util.experimental
def to_pyarrow(
self,
Expand Down
1 change: 1 addition & 0 deletions ibis/backends/dask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
class Backend(BasePandasBackend):
name = "dask"
backend_table_type = dd.DataFrame
pandas_converter = DaskData

def do_connect(
self,
Expand Down
23 changes: 22 additions & 1 deletion ibis/backends/snowflake/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import contextlib
import functools
import glob
import inspect
import itertools
Expand Down Expand Up @@ -115,6 +116,7 @@ class Backend(AlchemyCrossSchemaBackend, CanCreateDatabase, AlchemyCanCreateSche
supports_create_or_replace = True
supports_python_udfs = True
use_stmt_prefix = "USE SCHEMA"
pandas_converter = SnowflakePandasData

_latest_udf_python_version = (3, 10)

Expand Down Expand Up @@ -372,7 +374,26 @@ def fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
if (table := cursor.cursor.fetch_arrow_all()) is None:
table = schema.to_pyarrow().empty_table()
df = table.to_pandas(timestamp_as_object=True)
return SnowflakePandasData.convert_table(df, schema)
return self.pandas_converter.convert_table(df, schema)

def to_pandas_batches(
self,
expr: ir.Expr,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
**_: Any,
) -> pa.ipc.RecordBatchReader:
self._run_pre_execute_hooks(expr)
query_ast = self.compiler.to_ast_ensure_limit(expr, limit, params=params)
sql = query_ast.compile()
target_schema = expr.as_table().schema()
converter = functools.partial(
self.pandas_converter.convert_table, schema=target_schema
)

with self.begin() as con, contextlib.closing(con.execute(sql)) as cur:
yield from map(converter, cur.cursor.fetch_pandas_batches())

def to_pyarrow_batches(
self,
Expand Down
32 changes: 32 additions & 0 deletions ibis/backends/tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,3 +445,35 @@ def test_empty_memtable(backend, con):
table = ibis.memtable(expected)
result = con.execute(table)
backend.assert_frame_equal(result, expected)


@pytest.mark.notimpl(["dask", "druid", "flink", "impala", "pyspark"])
@pytest.mark.parametrize("n", [None, 0, 1, 2])
def test_to_pandas_batches_table(backend, con, n):
t = backend.functional_alltypes.limit(n)
n = t.count().execute()

assert sum(map(len, con.to_pandas_batches(t))) == n
assert sum(map(len, t.to_pandas_batches())) == n


@pytest.mark.notimpl(["dask", "druid", "flink", "impala", "pyspark"])
@pytest.mark.parametrize("n", [None, 0, 1, 2])
def test_to_pandas_batches_column(backend, con, n):
t = backend.functional_alltypes.limit(n).timestamp_col
n = t.count().execute()

assert sum(map(len, con.to_pandas_batches(t))) == n
assert sum(map(len, t.to_pandas_batches())) == n


@pytest.mark.notimpl(["dask", "druid", "flink", "impala", "pyspark"])
def test_to_pandas_batches_scalar(backend, con):
t = backend.functional_alltypes.timestamp_col.max()
expected = t.execute()

result1 = list(con.to_pandas_batches(t))
assert result1 == [expected]

result2 = list(t.to_pandas_batches())
assert result2 == [expected]
38 changes: 38 additions & 0 deletions ibis/expr/types/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,44 @@ def to_pyarrow(
self, params=params, limit=limit, **kwargs
)

@experimental
def to_pandas_batches(
self,
*,
limit: int | str | None = None,
params: Mapping[ir.Value, Any] | None = None,
chunk_size: int = 1_000_000,
**kwargs: Any,
) -> pa.ipc.RecordBatchReader:
"""Execute expression and return an iterator of pandas DataFrames.
This method is eager and will execute the associated expression
immediately.
Parameters
----------
limit
An integer to effect a specific row limit. A value of `None` means
"no limit". The default is in `ibis/config.py`.
params
Mapping of scalar parameter expressions to value.
chunk_size
Maximum number of rows in each returned batch.
kwargs
Keyword arguments
Returns
-------
Iterator[pd.DataFrame]
"""
return self._find_backend(use_default=True).to_pandas_batches(
self,
params=params,
limit=limit,
chunk_size=chunk_size,
**kwargs,
)

@experimental
def to_parquet(
self,
Expand Down

0 comments on commit 7c23aa9

Please sign in to comment.