Skip to content

Commit

Permalink
refactor(sqlglot): various sqlglot compiler and backend clean ups (ib…
Browse files Browse the repository at this point in the history
…is-project#7904)

This PR pulls out some changes from ibis-project#7871, including some removal of
redundant translation rules and fixing some xpassing tests
  • Loading branch information
cpcloud authored and ncclementi committed Feb 21, 2024
1 parent 34e19f1 commit c9cc9e5
Show file tree
Hide file tree
Showing 27 changed files with 271 additions and 183 deletions.
74 changes: 70 additions & 4 deletions ibis/backends/base/sqlglot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
from ibis.backends.base.sqlglot.compiler import STAR

if TYPE_CHECKING:
from collections.abc import Iterator
from collections.abc import Iterable, Iterator, Mapping

import pyarrow as pa

import ibis.expr.datatypes as dt
import ibis.expr.types as ir
Expand Down Expand Up @@ -60,7 +62,7 @@ def table(
).to_expr()

def _to_sqlglot(
self, expr: ir.Expr, limit: str | None = None, params=None, **_: Any
self, expr: ir.Expr, *, limit: str | None = None, params=None, **_: Any
):
"""Compile an Ibis expression to a sqlglot object."""
table_expr = expr.as_table()
Expand Down Expand Up @@ -206,13 +208,17 @@ def _clean_up_cached_table(self, op):
self.drop_table(op.name)

def execute(
self, expr: ir.Expr, limit: str | None = "default", **kwargs: Any
self,
expr: ir.Expr,
params: Mapping | None = None,
limit: str | None = "default",
**kwargs: Any,
) -> Any:
"""Execute an expression."""

self._run_pre_execute_hooks(expr)
table = expr.as_table()
sql = self.compile(table, limit=limit, **kwargs)
sql = self.compile(table, params=params, limit=limit, **kwargs)

schema = table.schema()

Expand All @@ -236,3 +242,63 @@ def drop_table(
)
with self._safe_raw_sql(drop_stmt):
pass

def _cursor_batches(
self,
expr: ir.Expr,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
chunk_size: int = 1 << 20,
) -> Iterable[list]:
self._run_pre_execute_hooks(expr)

with self._safe_raw_sql(
self.compile(expr, limit=limit, params=params)
) as cursor:
while batch := cursor.fetchmany(chunk_size):
yield batch

def to_pyarrow_batches(
self,
expr: ir.Expr,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
chunk_size: int = 1_000_000,
**_: Any,
) -> pa.ipc.RecordBatchReader:
"""Execute expression and return an iterator of pyarrow record batches.
This method is eager and will execute the associated expression
immediately.
Parameters
----------
expr
Ibis expression to export to pyarrow
limit
An integer to effect a specific row limit. A value of `None` means
"no limit". The default is in `ibis/config.py`.
params
Mapping of scalar parameter expressions to value.
chunk_size
Maximum number of rows in each returned record batch.
Returns
-------
RecordBatchReader
Collection of pyarrow `RecordBatch`s.
"""
pa = self._import_pyarrow()

schema = expr.as_table().schema()
array_type = schema.as_struct().to_pyarrow()
arrays = (
pa.array(map(tuple, batch), type=array_type)
for batch in self._cursor_batches(
expr, params=params, limit=limit, chunk_size=chunk_size
)
)
batches = map(pa.RecordBatch.from_struct_array, arrays)

return pa.ipc.RecordBatchReader.from_batches(schema.to_pyarrow(), batches)
Loading

0 comments on commit c9cc9e5

Please sign in to comment.