From 516141f79acfeb3870c4658c9df795ced3477675 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Thu, 4 Apr 2024 13:04:04 +0200 Subject: [PATCH] feat(chdb): support in-memory tables --- ibis/backends/chdb/__init__.py | 76 ++++++++++++++++++++++------------ poetry.lock | 8 ++-- 2 files changed, 54 insertions(+), 30 deletions(-) diff --git a/ibis/backends/chdb/__init__.py b/ibis/backends/chdb/__init__.py index bcc676324e989..7561f3ad2662f 100644 --- a/ibis/backends/chdb/__init__.py +++ b/ibis/backends/chdb/__init__.py @@ -1,6 +1,8 @@ from __future__ import annotations import contextlib +import pathlib +import tempfile from typing import TYPE_CHECKING, Any import pyarrow as pa @@ -11,19 +13,30 @@ import ibis.common.exceptions as com import ibis.expr.datatypes as dt +import ibis.expr.operations as ops import ibis.expr.schema as sch import ibis.expr.types as ir from ibis import util from ibis.backends import UrlFromPath from ibis.backends.clickhouse import Backend as CHBackend +from ibis.backends.clickhouse.compiler import ClickHouseCompiler from ibis.backends.sql.compiler import C from ibis.formats.pyarrow import PyArrowData, PyArrowType if TYPE_CHECKING: - from collections.abc import Mapping from pathlib import Path +class ArrowFileTable(ops.PhysicalTable): + path: str + schema: sch.Schema + + +class ChdbCompiler(ClickHouseCompiler): + def visit_ArrowFileTable(self, node, name, path, schema): + return self.f.file(path, "Arrow") + + class ChdbArrowConverter(PyArrowData): @classmethod def convert_column(cls, column: pa.Array, dtype: dt.DataType) -> pa.Array: @@ -47,6 +60,7 @@ def convert_table(cls, table: pa.Table, schema: sch.Schema) -> pa.Table: class Backend(CHBackend, UrlFromPath): name = "chdb" + compiler = ChdbCompiler() @property def version(self) -> str: @@ -62,18 +76,13 @@ def do_connect( if database is not None: self.raw_sql(f"USE {database}") - def raw_sql( - self, query: str | sge.Expression, external_tables=None, **kwargs - ) -> Any: + def raw_sql(self, query: str | sge.Expression, **kwargs) -> Any: """Execute a SQL string `query` against the database. Parameters ---------- query Raw SQL string - external_tables - Mapping of table name to pandas DataFrames providing - external datasources for the query kwargs Backend specific query arguments @@ -83,10 +92,8 @@ def raw_sql( Clickhouse cursor """ - if external_tables: - raise NotImplementedError("External tables are not yet supported") with contextlib.suppress(AttributeError): - query = query.sql(self.dialect, pretty=True) + query = query.sql(self.dialect) self._log(query) return self.con.query(query, **kwargs) @@ -95,7 +102,7 @@ def _safe_raw_sql(self, *args, **kwargs): yield self.raw_sql(*args, **kwargs) def get_schema( - self, table_name: str, database: str | None = None, schema: str | None = None + self, table_name: str, catalog: str | None = None, database: str | None = None ) -> sch.Schema: """Return a Schema object for the indicated table and database. @@ -104,10 +111,10 @@ def get_schema( table_name May **not** be fully qualified. Use `database` if you want to qualify the identifier. + catalog + Catalog name, not supported by ClickHouse database Database name - schema - Schema name, not supported by ClickHouse Returns ------- @@ -115,9 +122,9 @@ def get_schema( Ibis schema """ - if schema is not None: + if catalog is not None: raise com.UnsupportedBackendFeatureError( - "`schema` namespaces are not supported by chdb" + "`catalog` namespaces are not supported by chdb" ) query = sge.Describe(this=sg.table(table_name, db=database)) table = self.raw_sql(query, fmt="arrowtable") @@ -140,19 +147,36 @@ def execute(self, expr: ir.Expr, **kwargs: Any) -> Any: df = self.to_pyarrow(table, **kwargs).to_pandas() return expr.__pandas_result__(table.__pandas_result__(df)) - def to_pyarrow( - self, - expr: ir.Expr, - *, - external_tables: Mapping[str, Any] | None = None, - **kwargs: Any, - ): + @contextlib.contextmanager + def _persisted_memtables(self, table): + node = table.op() + memtables = node.find(ops.InMemoryTable) + if not memtables: + yield table + return + + subs = {} + local = pa.fs.LocalFileSystem() + with tempfile.TemporaryDirectory() as tmpdir: + for memtable in memtables: + path = str(pathlib.Path(tmpdir) / f"{memtable.name}.arrow") + table = memtable.data.to_pyarrow(memtable.schema) + with local.open_output_stream(str(path)) as out: + with pa.RecordBatchFileWriter(out, table.schema) as writer: + writer.write_table(table) + subs[memtable] = ArrowFileTable( + name=memtable.name, schema=memtable.schema, path=path + ) + + yield node.replace(subs).to_expr() + + def to_pyarrow(self, expr: ir.Expr, **kwargs: Any): table = expr.as_table() - sql = self.compile(table, **kwargs) - self._log(sql) - external_tables = self._collect_in_memory_tables(expr, external_tables) + with self._persisted_memtables(table) as table: + sql = self.compile(table, **kwargs) + self._log(sql) + result = self.raw_sql(sql, fmt="arrowtable") - result = self.raw_sql(sql, fmt="arrowtable", external_tables=external_tables) result = ChdbArrowConverter.convert_table(result, table.schema()) return expr.__pyarrow_result__(result) diff --git a/poetry.lock b/poetry.lock index d9ddce1defc95..020d876614ac3 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "aiohttp" @@ -3450,6 +3450,7 @@ files = [ {file = "msgpack-1.0.8-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:5fbb160554e319f7b22ecf530a80a3ff496d38e8e07ae763b9e82fadfe96f273"}, {file = "msgpack-1.0.8-cp39-cp39-win32.whl", hash = "sha256:f9af38a89b6a5c04b7d18c492c8ccf2aee7048aff1ce8437c4683bb5a1df893d"}, {file = "msgpack-1.0.8-cp39-cp39-win_amd64.whl", hash = "sha256:ed59dd52075f8fc91da6053b12e8c89e37aa043f8986efd89e61fae69dc1b011"}, + {file = "msgpack-1.0.8-py3-none-any.whl", hash = "sha256:24f727df1e20b9876fa6e95f840a2a2651e34c0ad147676356f4bf5fbb0206ca"}, {file = "msgpack-1.0.8.tar.gz", hash = "sha256:95c02b0e27e706e48d0e5426d1710ca78e0f0628d6e89d5b5a5b91a5f12274f3"}, ] @@ -5387,7 +5388,6 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, - {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, @@ -7329,7 +7329,7 @@ cffi = {version = ">=1.11", markers = "platform_python_implementation == \"PyPy\ cffi = ["cffi (>=1.11)"] [extras] -all = ["black", "clickhouse-connect", "dask", "datafusion", "db-dtypes", "deltalake", "duckdb", "fsspec", "geopandas", "google-cloud-bigquery", "google-cloud-bigquery-storage", "graphviz", "impyla", "oracledb", "packaging", "pins", "polars", "psycopg2", "pydata-google-auth", "pydruid", "pyexasol", "pymysql", "pyodbc", "pyspark", "regex", "shapely", "snowflake-connector-python", "trino"] +all = ["black", "chdb", "clickhouse-connect", "dask", "datafusion", "db-dtypes", "deltalake", "duckdb", "fsspec", "geopandas", "google-cloud-bigquery", "google-cloud-bigquery-storage", "graphviz", "impyla", "oracledb", "packaging", "pins", "polars", "psycopg2", "pydata-google-auth", "pydruid", "pyexasol", "pymysql", "pyodbc", "pyspark", "regex", "shapely", "snowflake-connector-python", "trino"] bigquery = ["db-dtypes", "google-cloud-bigquery", "google-cloud-bigquery-storage", "pydata-google-auth"] chdb = ["chdb"] clickhouse = ["clickhouse-connect"] @@ -7360,4 +7360,4 @@ visualization = ["graphviz"] [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "4187fa71283c284cda0c6f5770c5a17a58bbf3442e854ae8d8bb44d01ced475a" +content-hash = "09ce30e117acb60707f9117f093d3bde729524a9193556b158e9d39d5bd44a1d"