Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support read_parquet for backend with no native support #9744

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ab2ad16
support read_parquet for backend with no native support
jitingxu1 Aug 1, 2024
661f50d
fix unit tests
jitingxu1 Aug 2, 2024
e16f1bb
resolve Xpass and clickhouse tests
jitingxu1 Aug 2, 2024
eaec7a2
handle different inputs
jitingxu1 Aug 5, 2024
9106ad8
Merge branch 'main' into extend-read-parquet
jitingxu1 Aug 6, 2024
27d7a08
pandas not suporting glob pattern
jitingxu1 Aug 6, 2024
ac6117f
Merge branch 'main' into extend-read-parquet
gforsyth Aug 6, 2024
3ce9674
tests for url and fssepc url
jitingxu1 Aug 18, 2024
24530ca
resolve pandas use pyarrow as default
jitingxu1 Aug 19, 2024
bb238af
add test for is_url and is_fsspec_url
jitingxu1 Aug 21, 2024
12cfc7d
change to fssepc and add examples
jitingxu1 Aug 23, 2024
2cf597a
add reason for mark.never
jitingxu1 Aug 23, 2024
b4cf0ea
re run workflow
jitingxu1 Aug 23, 2024
2ba5002
Merge branch 'main' into extend-read-parquet
jitingxu1 Aug 23, 2024
6f2c754
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 11, 2024
24bfe38
lint
jitingxu1 Sep 15, 2024
6a50c46
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 15, 2024
4579bff
remove pandas
jitingxu1 Sep 15, 2024
d1ed444
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 16, 2024
b01bc6a
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 18, 2024
e70de2f
Merge branch 'ibis-project:main' into extend-read-parquet
jitingxu1 Sep 18, 2024
413ada7
Merge branch 'ibis-project:main' into extend-read-parquet
jitingxu1 Sep 19, 2024
c3fba44
reconcile coe
jitingxu1 Sep 19, 2024
8b6b3c6
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 20, 2024
0d55190
skip trino and impala
jitingxu1 Sep 20, 2024
fda5493
Trigger CI
jitingxu1 Sep 20, 2024
71ebb8e
chore: trigger CI
jitingxu1 Sep 21, 2024
2473c02
chore(test): skip test for backends with own parquet readers
gforsyth Sep 23, 2024
3ab60a8
chore: simplify the logic
jitingxu1 Sep 25, 2024
59c03e0
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 25, 2024
c0c1fd1
chore: lint
jitingxu1 Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions ibis/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import collections.abc
import contextlib
import functools
import glob
import importlib.metadata
import keyword
import re
Expand All @@ -22,6 +23,7 @@

if TYPE_CHECKING:
from collections.abc import Iterable, Iterator, Mapping, MutableMapping
from io import BytesIO
from urllib.parse import ParseResult

import pandas as pd
Expand Down Expand Up @@ -1269,6 +1271,100 @@ def has_operation(cls, operation: type[ops.Value]) -> bool:
f"{cls.name} backend has not implemented `has_operation` API"
)

@util.experimental
def read_parquet(
self, path: str | Path | BytesIO, table_name: str | None = None, **kwargs: Any
) -> ir.Table:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of BytesIO, I could pass the fsspec object, It could be HTTPFile if we pass an HTTP url. Not sure what is the best way to handle the type of path

@gforsyth any suggestion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think fsspec is a good option.

"""Register a parquet file as a table in the current backend.

This function reads a Parquet file and registers it as a table in the current
backend. Note that for Impala and Trino backends, the performance
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
backend. Note that for Impala and Trino backends, the performance
backend. Note that for the Impala and Trino backends, the performance

may be suboptimal.

Parameters
----------
path
The data source. May be a path to a file, glob pattern to match Parquet files,
directory of parquet files, or BytseIO.
table_name
An optional name to use for the created table. This defaults to
a sequentially generated name.
**kwargs
Additional keyword arguments passed to the pyarrow loading function.
See https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html
for more information.

Returns
-------
ir.Table
The just-registered table

Examples
--------
Connect to a SQLite database:

>>> con = ibis.sqlite.connect()

Read a single parquet file:

>>> table = con.read_parquet("path/to/file.parquet")

Read all parquet files in a directory:

>>> table = con.read_parquet("path/to/parquet_directory/")

Read parquet files with a glob pattern

>>> table = con.read_parquet("path/to/parquet_directory/data_*.parquet")

Read from Amazon S3

>>> table = con.read_parquet("s3://bucket-name/path/to/file.parquet")

Read from Google Cloud Storage

>>> table = con.read_parquet("gs://bucket-name/path/to/file.parquet")

Read with a custom table name

>>> table = con.read_parquet("s3://bucket/data.parquet", table_name="my_table")

Read with additional pyarrow options

>>> table = con.read_parquet("gs://bucket/data.parquet", columns=["col1", "col2"])

Read from Amazon S3 with secret info

>>> from pyarrow import fs
>>> s3_fs = fs.S3FileSystem(
... access_key="YOUR_ACCESS_KEY", secret_key="YOUR_SECRET_KEY", region="YOUR_AWS_REGION"
... )
>>> table = con.read_parquet("s3://bucket/data.parquet", filesystem=s3_fs)

Read from HTTPS URL

>>> import fsspec
>>> from io import BytesIO
>>> url = "https://example.com/data/file.parquet"
>>> credentials = {}
>>> f = fsspec.open(url, **credentials).open()
>>> reader = BytesIO(f.read())
>>> table = con.read_parquet(reader)
>>> reader.close()
>>> f.close()
"""
import pyarrow.parquet as pq

table_name = table_name or util.gen_name("read_parquet")
paths = list(glob.glob(str(path)))
if paths:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a comment here indicating that this is to help with reading from remote file locations

table = pq.read_table(paths, **kwargs)
else:
table = pq.read_table(path, **kwargs)

self.create_table(table_name, table)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the read_csv PR, this should probably be a memtable so we don't create a persistent table by default

return self.table(table_name)

def _transpile_sql(self, query: str, *, dialect: str | None = None) -> str:
# only transpile if dialect was passed
if dialect is None:
Expand Down
68 changes: 49 additions & 19 deletions ibis/backends/tests/test_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import csv
import gzip
import os
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING

Expand All @@ -21,7 +22,6 @@
import pyarrow as pa

pytestmark = [
pytest.mark.notimpl(["druid", "exasol", "oracle"]),
pytest.mark.notyet(
["pyspark"], condition=IS_SPARK_REMOTE, raises=PySparkAnalysisException
),
Expand Down Expand Up @@ -103,6 +103,7 @@
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
def test_register_csv(con, data_dir, fname, in_table_name, out_table_name):
with pushd(data_dir / "csv"):
with pytest.warns(FutureWarning, match="v9.1"):
Expand All @@ -114,7 +115,7 @@


# TODO: rewrite or delete test when register api is removed
@pytest.mark.notimpl(["datafusion"])
@pytest.mark.notimpl(["datafusion", "druid", "exasol", "oracle"])
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
@pytest.mark.notyet(
[
"bigquery",
Expand Down Expand Up @@ -154,6 +155,7 @@
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_register_with_dotted_name(con, data_dir, tmp_path):
basename = "foo.bar.baz/diamonds.csv"
f = tmp_path.joinpath(basename)
Expand Down Expand Up @@ -211,6 +213,7 @@
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_register_parquet(
con, tmp_path, data_dir, fname, in_table_name, out_table_name
):
Expand Down Expand Up @@ -249,6 +252,7 @@
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_register_iterator_parquet(
con,
tmp_path,
Expand Down Expand Up @@ -277,7 +281,7 @@
# TODO: remove entirely when `register` is removed
# This same functionality is implemented across all backends
# via `create_table` and tested in `test_client.py`
@pytest.mark.notimpl(["datafusion"])
@pytest.mark.notimpl(["datafusion", "druid", "exasol", "oracle"])
@pytest.mark.notyet(
[
"bigquery",
Expand Down Expand Up @@ -311,7 +315,7 @@
# TODO: remove entirely when `register` is removed
# This same functionality is implemented across all backends
# via `create_table` and tested in `test_client.py`
@pytest.mark.notimpl(["datafusion", "polars"])
@pytest.mark.notimpl(["datafusion", "polars", "druid", "exasol", "oracle"])
@pytest.mark.notyet(
[
"bigquery",
Expand Down Expand Up @@ -352,6 +356,7 @@
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_csv_reregister_schema(con, tmp_path):
foo = tmp_path.joinpath("foo.csv")
with foo.open("w", newline="") as csvfile:
Expand Down Expand Up @@ -380,10 +385,13 @@
"bigquery",
"clickhouse",
"datafusion",
"druid",
"exasol",
"flink",
"impala",
"mysql",
"mssql",
"oracle",
"polars",
"postgres",
"risingwave",
Expand Down Expand Up @@ -414,12 +422,17 @@
("functional_alltypes.parquet", "funk_all"),
],
)
@pytest.mark.notyet(
["flink", "impala", "mssql", "mysql", "postgres", "risingwave", "sqlite", "trino"]
)
@pytest.mark.notyet(["flink"])
@pytest.mark.notimpl(["druid"])
def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name):
pq = pytest.importorskip("pyarrow.parquet")

if con.name in ("trino", "impala"):
# TODO: remove after trino and impala have efficient insertion
pytest.skip(

Check warning on line 432 in ibis/backends/tests/test_register.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/tests/test_register.py#L432

Added line #L432 was not covered by tests
"Both Impala and Trino lack efficient data insertion methods from Python."
)

fname = Path(fname)
fname = Path(data_dir) / "parquet" / fname.name
table = pq.read_table(fname)
Expand All @@ -445,18 +458,8 @@
return table.slice(0, nrows)


@pytest.mark.notyet(
[
"flink",
"impala",
"mssql",
"mysql",
"postgres",
"risingwave",
"sqlite",
"trino",
]
)
@pytest.mark.notyet(["flink"])
@pytest.mark.notimpl(["druid"])
def test_read_parquet_glob(con, tmp_path, ft_data):
pq = pytest.importorskip("pyarrow.parquet")

Expand All @@ -473,6 +476,30 @@
assert table.count().execute() == nrows * ntables


@pytest.mark.notyet(["flink"])
@pytest.mark.notimpl(["druid"])
@pytest.mark.never(
[
"duckdb",
"polars",
"bigquery",
"clickhouse",
"datafusion",
"snowflake",
"pyspark",
],
reason="backend implements its own read_parquet",
)
def test_read_parquet_bytesio(con, ft_data):
pq = pytest.importorskip("pyarrow.parquet")

bytes_io = BytesIO()
pq.write_table(ft_data, bytes_io)
bytes_io.seek(0)
table = con.read_parquet(bytes_io)
assert table.count().execute() == ft_data.num_rows


@pytest.mark.notyet(
[
"flink",
Expand All @@ -485,6 +512,7 @@
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_read_csv_glob(con, tmp_path, ft_data):
pc = pytest.importorskip("pyarrow.csv")

Expand Down Expand Up @@ -519,6 +547,7 @@
raises=ValueError,
reason="read_json() missing required argument: 'schema'",
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_read_json_glob(con, tmp_path, ft_data):
nrows = len(ft_data)
ntables = 2
Expand Down Expand Up @@ -565,6 +594,7 @@
@pytest.mark.notyet(
["flink", "impala", "mssql", "mysql", "postgres", "risingwave", "sqlite", "trino"]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_read_csv(con, data_dir, in_table_name, num_diamonds):
fname = "diamonds.csv"
with pushd(data_dir / "csv"):
Expand Down