diff --git a/ibis/backends/base/__init__.py b/ibis/backends/base/__init__.py index 459d78d64c4f0..2d5303b6f3be7 100644 --- a/ibis/backends/base/__init__.py +++ b/ibis/backends/base/__init__.py @@ -22,6 +22,7 @@ import ibis.expr.types as ir from ibis import util from ibis.common.caching import RefCountedCache +from ibis.formats.pandas import PandasData if TYPE_CHECKING: from collections.abc import Iterable, Iterator, Mapping, MutableMapping @@ -223,6 +224,8 @@ def _ipython_key_completions_(self) -> list[str]: class _FileIOHandler: + pandas_converter = PandasData + @staticmethod def _import_pyarrow(): try: @@ -234,6 +237,46 @@ def _import_pyarrow(): else: return pyarrow + def to_pandas_batches( + self, + expr: ir.Expr, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + chunk_size: int = 1_000_000, + **kwargs: Any, + ) -> Iterator[pd.DataFrame]: + """Execute an Ibis expression and return an iterator of pandas `DataFrame`s. + + Parameters + ---------- + expr + Ibis expression to execute + params + Mapping of scalar parameter expressions to value. + limit + An integer to effect a specific row limit. A value of `None` means + "no limit". The default is in `ibis/config.py`. + chunk_size + Maximum number of rows in each returned record batch. This may have + no effect depending on the backend. + kwargs + Keyword arguments + + Returns + ------- + Iterator[pd.DataFrame] + An iterator pandas `DataFrame`s + """ + + schema = expr.schema() + yield from ( + self.pandas_converter.convert_table(batch.to_pandas(), schema) + for batch in self.to_pyarrow_batches( + expr, params=params, limit=limit, chunk_size=chunk_size, **kwargs + ) + ) + @util.experimental def to_pyarrow( self, diff --git a/ibis/backends/dask/__init__.py b/ibis/backends/dask/__init__.py index 88ef89a9cdf65..55fe48fc660fe 100644 --- a/ibis/backends/dask/__init__.py +++ b/ibis/backends/dask/__init__.py @@ -29,6 +29,7 @@ class Backend(BasePandasBackend): name = "dask" backend_table_type = dd.DataFrame + pandas_converter = DaskData def do_connect( self, diff --git a/ibis/backends/snowflake/__init__.py b/ibis/backends/snowflake/__init__.py index f875ef66cda2e..3716063175e5e 100644 --- a/ibis/backends/snowflake/__init__.py +++ b/ibis/backends/snowflake/__init__.py @@ -1,6 +1,7 @@ from __future__ import annotations import contextlib +import functools import glob import inspect import itertools @@ -115,6 +116,7 @@ class Backend(AlchemyCrossSchemaBackend, CanCreateDatabase, AlchemyCanCreateSche supports_create_or_replace = True supports_python_udfs = True use_stmt_prefix = "USE SCHEMA" + pandas_converter = SnowflakePandasData _latest_udf_python_version = (3, 10) @@ -372,7 +374,26 @@ def fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame: if (table := cursor.cursor.fetch_arrow_all()) is None: table = schema.to_pyarrow().empty_table() df = table.to_pandas(timestamp_as_object=True) - return SnowflakePandasData.convert_table(df, schema) + return self.pandas_converter.convert_table(df, schema) + + def to_pandas_batches( + self, + expr: ir.Expr, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + **_: Any, + ) -> pa.ipc.RecordBatchReader: + self._run_pre_execute_hooks(expr) + query_ast = self.compiler.to_ast_ensure_limit(expr, limit, params=params) + sql = query_ast.compile() + target_schema = expr.as_table().schema() + converter = functools.partial( + self.pandas_converter.convert_table, schema=target_schema + ) + + with self.begin() as con, contextlib.closing(con.execute(sql)) as cur: + yield from map(converter, cur.cursor.fetch_pandas_batches()) def to_pyarrow_batches( self, diff --git a/ibis/backends/tests/test_export.py b/ibis/backends/tests/test_export.py index 7d8c4bbc63abf..a4f076b12f964 100644 --- a/ibis/backends/tests/test_export.py +++ b/ibis/backends/tests/test_export.py @@ -445,3 +445,12 @@ def test_empty_memtable(backend, con): table = ibis.memtable(expected) result = con.execute(table) backend.assert_frame_equal(result, expected) + + +@pytest.mark.notimpl(["dask", "druid", "flink", "impala", "pyspark"]) +def test_to_pandas_batches(backend, con): + t = backend.functional_alltypes + n = t.count().execute() + + assert sum(map(len, con.to_pandas_batches(t))) == n + assert sum(map(len, t.to_pandas_batches())) == n diff --git a/ibis/expr/types/core.py b/ibis/expr/types/core.py index 64359c1c68370..90be4745179c9 100644 --- a/ibis/expr/types/core.py +++ b/ibis/expr/types/core.py @@ -422,6 +422,44 @@ def to_pyarrow( self, params=params, limit=limit, **kwargs ) + @experimental + def to_pandas_batches( + self, + *, + limit: int | str | None = None, + params: Mapping[ir.Value, Any] | None = None, + chunk_size: int = 1_000_000, + **kwargs: Any, + ) -> pa.ipc.RecordBatchReader: + """Execute expression and return an iterator of pandas DataFrames. + + This method is eager and will execute the associated expression + immediately. + + Parameters + ---------- + limit + An integer to effect a specific row limit. A value of `None` means + "no limit". The default is in `ibis/config.py`. + params + Mapping of scalar parameter expressions to value. + chunk_size + Maximum number of rows in each returned batch. + kwargs + Keyword arguments + + Returns + ------- + Iterator[pd.DataFrame] + """ + return self._find_backend(use_default=True).to_pandas_batches( + self, + params=params, + limit=limit, + chunk_size=chunk_size, + **kwargs, + ) + @experimental def to_parquet( self,