Skip to content

Commit

Permalink
feat(chdb): support in-memory tables
Browse files Browse the repository at this point in the history
  • Loading branch information
kszucs committed Apr 4, 2024
1 parent af31f57 commit 516141f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 30 deletions.
76 changes: 50 additions & 26 deletions ibis/backends/chdb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import contextlib
import pathlib
import tempfile
from typing import TYPE_CHECKING, Any

import pyarrow as pa
Expand All @@ -11,19 +13,30 @@

import ibis.common.exceptions as com
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
import ibis.expr.schema as sch
import ibis.expr.types as ir
from ibis import util
from ibis.backends import UrlFromPath
from ibis.backends.clickhouse import Backend as CHBackend
from ibis.backends.clickhouse.compiler import ClickHouseCompiler
from ibis.backends.sql.compiler import C
from ibis.formats.pyarrow import PyArrowData, PyArrowType

if TYPE_CHECKING:
from collections.abc import Mapping
from pathlib import Path


class ArrowFileTable(ops.PhysicalTable):
path: str
schema: sch.Schema


class ChdbCompiler(ClickHouseCompiler):
def visit_ArrowFileTable(self, node, name, path, schema):
return self.f.file(path, "Arrow")


class ChdbArrowConverter(PyArrowData):
@classmethod
def convert_column(cls, column: pa.Array, dtype: dt.DataType) -> pa.Array:
Expand All @@ -47,6 +60,7 @@ def convert_table(cls, table: pa.Table, schema: sch.Schema) -> pa.Table:

class Backend(CHBackend, UrlFromPath):
name = "chdb"
compiler = ChdbCompiler()

@property
def version(self) -> str:
Expand All @@ -62,18 +76,13 @@ def do_connect(
if database is not None:
self.raw_sql(f"USE {database}")

def raw_sql(
self, query: str | sge.Expression, external_tables=None, **kwargs
) -> Any:
def raw_sql(self, query: str | sge.Expression, **kwargs) -> Any:
"""Execute a SQL string `query` against the database.
Parameters
----------
query
Raw SQL string
external_tables
Mapping of table name to pandas DataFrames providing
external datasources for the query
kwargs
Backend specific query arguments
Expand All @@ -83,10 +92,8 @@ def raw_sql(
Clickhouse cursor
"""
if external_tables:
raise NotImplementedError("External tables are not yet supported")
with contextlib.suppress(AttributeError):
query = query.sql(self.dialect, pretty=True)
query = query.sql(self.dialect)
self._log(query)
return self.con.query(query, **kwargs)

Expand All @@ -95,7 +102,7 @@ def _safe_raw_sql(self, *args, **kwargs):
yield self.raw_sql(*args, **kwargs)

def get_schema(
self, table_name: str, database: str | None = None, schema: str | None = None
self, table_name: str, catalog: str | None = None, database: str | None = None
) -> sch.Schema:
"""Return a Schema object for the indicated table and database.
Expand All @@ -104,20 +111,20 @@ def get_schema(
table_name
May **not** be fully qualified. Use `database` if you want to
qualify the identifier.
catalog
Catalog name, not supported by ClickHouse
database
Database name
schema
Schema name, not supported by ClickHouse
Returns
-------
sch.Schema
Ibis schema
"""
if schema is not None:
if catalog is not None:
raise com.UnsupportedBackendFeatureError(
"`schema` namespaces are not supported by chdb"
"`catalog` namespaces are not supported by chdb"
)
query = sge.Describe(this=sg.table(table_name, db=database))
table = self.raw_sql(query, fmt="arrowtable")
Expand All @@ -140,19 +147,36 @@ def execute(self, expr: ir.Expr, **kwargs: Any) -> Any:
df = self.to_pyarrow(table, **kwargs).to_pandas()
return expr.__pandas_result__(table.__pandas_result__(df))

def to_pyarrow(
self,
expr: ir.Expr,
*,
external_tables: Mapping[str, Any] | None = None,
**kwargs: Any,
):
@contextlib.contextmanager
def _persisted_memtables(self, table):
node = table.op()
memtables = node.find(ops.InMemoryTable)
if not memtables:
yield table
return

subs = {}
local = pa.fs.LocalFileSystem()
with tempfile.TemporaryDirectory() as tmpdir:
for memtable in memtables:
path = str(pathlib.Path(tmpdir) / f"{memtable.name}.arrow")
table = memtable.data.to_pyarrow(memtable.schema)
with local.open_output_stream(str(path)) as out:
with pa.RecordBatchFileWriter(out, table.schema) as writer:
writer.write_table(table)
subs[memtable] = ArrowFileTable(
name=memtable.name, schema=memtable.schema, path=path
)

yield node.replace(subs).to_expr()

def to_pyarrow(self, expr: ir.Expr, **kwargs: Any):
table = expr.as_table()
sql = self.compile(table, **kwargs)
self._log(sql)
external_tables = self._collect_in_memory_tables(expr, external_tables)
with self._persisted_memtables(table) as table:
sql = self.compile(table, **kwargs)
self._log(sql)
result = self.raw_sql(sql, fmt="arrowtable")

result = self.raw_sql(sql, fmt="arrowtable", external_tables=external_tables)
result = ChdbArrowConverter.convert_table(result, table.schema())
return expr.__pyarrow_result__(result)

Expand Down
8 changes: 4 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 516141f

Please sign in to comment.