Skip to content

Commit

Permalink
feat(trino): port to sqlglot
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Jan 4, 2024
1 parent 61e34b9 commit d22fb51
Show file tree
Hide file tree
Showing 82 changed files with 4,147 additions and 2,234 deletions.
37 changes: 14 additions & 23 deletions .github/workflows/ibis-backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,12 @@ jobs:
# - freetds-dev
# - unixodbc-dev
# - tdsodbc
# - name: trino
# title: Trino
# extras:
# - trino
# - postgres
# services:
# - trino
- name: trino
title: Trino
extras:
- trino
services:
- trino
# - name: druid
# title: Druid
# extras:
Expand Down Expand Up @@ -248,15 +247,14 @@ jobs:
# - freetds-dev
# - unixodbc-dev
# - tdsodbc
# - os: windows-latest
# backend:
# name: trino
# title: Trino
# services:
# - trino
# extras:
# - trino
# - postgres
- os: windows-latest
backend:
name: trino
title: Trino
services:
- trino
extras:
- trino
# - os: windows-latest
# backend:
# name: druid
Expand Down Expand Up @@ -685,13 +683,6 @@ jobs:
# title: SQLite
# extras:
# - sqlite
# - name: trino
# title: Trino
# services:
# - trino
# extras:
# - trino
# - postgres
# - name: oracle
# title: Oracle
# serial: true
Expand Down
74 changes: 70 additions & 4 deletions ibis/backends/base/sqlglot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
from ibis.backends.base.sqlglot.compiler import STAR

if TYPE_CHECKING:
from collections.abc import Iterator
from collections.abc import Iterable, Iterator, Mapping

import pyarrow as pa

import ibis.expr.datatypes as dt
import ibis.expr.types as ir
Expand Down Expand Up @@ -60,7 +62,7 @@ def table(
).to_expr()

def _to_sqlglot(
self, expr: ir.Expr, limit: str | None = None, params=None, **_: Any
self, expr: ir.Expr, *, limit: str | None = None, params=None, **_: Any
):
"""Compile an Ibis expression to a sqlglot object."""
table_expr = expr.as_table()
Expand Down Expand Up @@ -206,13 +208,17 @@ def _clean_up_cached_table(self, op):
self.drop_table(op.name)

def execute(
self, expr: ir.Expr, limit: str | None = "default", **kwargs: Any
self,
expr: ir.Expr,
params: Mapping | None = None,
limit: str | None = "default",
**kwargs: Any,
) -> Any:
"""Execute an expression."""

self._run_pre_execute_hooks(expr)
table = expr.as_table()
sql = self.compile(table, limit=limit, **kwargs)
sql = self.compile(table, params=params, limit=limit, **kwargs)

schema = table.schema()

Expand All @@ -236,3 +242,63 @@ def drop_table(
)
with self._safe_raw_sql(drop_stmt):
pass

def _cursor_batches(
self,
expr: ir.Expr,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
chunk_size: int = 1 << 20,
) -> Iterable[list]:
self._run_pre_execute_hooks(expr)

with self._safe_raw_sql(
self.compile(expr, limit=limit, params=params)
) as cursor:
while batch := cursor.fetchmany(chunk_size):
yield batch

def to_pyarrow_batches(
self,
expr: ir.Expr,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
chunk_size: int = 1_000_000,
**_: Any,
) -> pa.ipc.RecordBatchReader:
"""Execute expression and return an iterator of pyarrow record batches.
This method is eager and will execute the associated expression
immediately.
Parameters
----------
expr
Ibis expression to export to pyarrow
limit
An integer to effect a specific row limit. A value of `None` means
"no limit". The default is in `ibis/config.py`.
params
Mapping of scalar parameter expressions to value.
chunk_size
Maximum number of rows in each returned record batch.
Returns
-------
RecordBatchReader
Collection of pyarrow `RecordBatch`s.
"""
pa = self._import_pyarrow()

schema = expr.as_table().schema()
array_type = schema.as_struct().to_pyarrow()
arrays = (
pa.array(map(tuple, batch), type=array_type)
for batch in self._cursor_batches(
expr, params=params, limit=limit, chunk_size=chunk_size
)
)
batches = map(pa.RecordBatch.from_struct_array, arrays)

return pa.ipc.RecordBatchReader.from_batches(schema.to_pyarrow(), batches)
Loading

0 comments on commit d22fb51

Please sign in to comment.