-
Notifications
You must be signed in to change notification settings - Fork 603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support read_parquet for backend with no native support #9744
Changes from 28 commits
ab2ad16
661f50d
e16f1bb
eaec7a2
9106ad8
27d7a08
ac6117f
3ce9674
24530ca
bb238af
12cfc7d
2cf597a
b4cf0ea
2ba5002
6f2c754
24bfe38
6a50c46
4579bff
d1ed444
b01bc6a
e70de2f
413ada7
c3fba44
8b6b3c6
0d55190
fda5493
71ebb8e
2473c02
3ab60a8
59c03e0
c0c1fd1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -4,12 +4,14 @@ | |||||
import collections.abc | ||||||
import contextlib | ||||||
import functools | ||||||
import glob | ||||||
import importlib.metadata | ||||||
import keyword | ||||||
import re | ||||||
import sys | ||||||
import urllib.parse | ||||||
import weakref | ||||||
from io import BytesIO | ||||||
from pathlib import Path | ||||||
from typing import TYPE_CHECKING, Any, ClassVar, NamedTuple | ||||||
|
||||||
|
@@ -1269,6 +1271,128 @@ | |||||
f"{cls.name} backend has not implemented `has_operation` API" | ||||||
) | ||||||
|
||||||
@util.experimental | ||||||
def read_parquet( | ||||||
self, path: str | Path, table_name: str | None = None, **kwargs: Any | ||||||
) -> ir.Table: | ||||||
"""Register a parquet file as a table in the current backend. | ||||||
|
||||||
This function reads a Parquet file and registers it as a table in the current | ||||||
backend. Note that for Impala and Trino backends, the performance | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
may be suboptimal. | ||||||
|
||||||
Parameters | ||||||
---------- | ||||||
path | ||||||
The data source. May be a path to a file, an iterable of files, | ||||||
or directory of parquet files. | ||||||
table_name | ||||||
An optional name to use for the created table. This defaults to | ||||||
a sequentially generated name. | ||||||
**kwargs | ||||||
Additional keyword arguments passed to the pyarrow loading function. | ||||||
See https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html | ||||||
for more information. | ||||||
|
||||||
When reading data from cloud storage (such as Amazon S3 or Google Cloud Storage), | ||||||
credentials can be provided via the `filesystem` argument by creating an appropriate | ||||||
filesystem object (e.g., `pyarrow.fs.S3FileSystem`). | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is true, and additionally, |
||||||
|
||||||
For URLs with credentials, `fsspec` is used to handle authentication and file access. | ||||||
Pass the credentials using the `credentials` keyword argument. `fsspec` will use these | ||||||
credentials to manage access to the remote files. | ||||||
|
||||||
Returns | ||||||
------- | ||||||
ir.Table | ||||||
The just-registered table | ||||||
|
||||||
Examples | ||||||
-------- | ||||||
Connect to a SQLite database: | ||||||
|
||||||
>>> con = ibis.sqlite.connect() | ||||||
|
||||||
Read a single parquet file: | ||||||
|
||||||
>>> table = con.read_parquet("path/to/file.parquet") | ||||||
|
||||||
Read all parquet files in a directory: | ||||||
|
||||||
>>> table = con.read_parquet("path/to/parquet_directory/") | ||||||
|
||||||
Read parquet files with a glob pattern | ||||||
|
||||||
>>> table = con.read_parquet("path/to/parquet_directory/data_*.parquet") | ||||||
|
||||||
Read from Amazon S3 | ||||||
|
||||||
>>> table = con.read_parquet("s3://bucket-name/path/to/file.parquet") | ||||||
|
||||||
Read from Google Cloud Storage | ||||||
|
||||||
>>> table = con.read_parquet("gs://bucket-name/path/to/file.parquet") | ||||||
|
||||||
Read from HTTPS URL | ||||||
|
||||||
>>> table = con.read_parquet("https://example.com/data/file.parquet") | ||||||
|
||||||
Read with a custom table name | ||||||
|
||||||
>>> table = con.read_parquet("s3://bucket/data.parquet", table_name="my_table") | ||||||
|
||||||
Read with additional pyarrow options | ||||||
|
||||||
>>> table = con.read_parquet("gs://bucket/data.parquet", columns=["col1", "col2"]) | ||||||
|
||||||
Read from Amazon S3 with secret info | ||||||
|
||||||
>>> from pyarrow import fs | ||||||
>>> s3_fs = fs.S3FileSystem( | ||||||
... access_key="YOUR_ACCESS_KEY", secret_key="YOUR_SECRET_KEY", region="YOUR_AWS_REGION" | ||||||
... ) | ||||||
>>> table = con.read_parquet("s3://bucket/data.parquet", filesystem=s3_fs) | ||||||
|
||||||
Read from HTTPS URL with authentication tokens | ||||||
|
||||||
>>> table = con.read_parquet( | ||||||
... "https://example.com/data/file.parquet", | ||||||
... credentials={"headers": {"Authorization": "Bearer YOUR_TOKEN"}}, | ||||||
... ) | ||||||
|
||||||
""" | ||||||
|
||||||
table = self._get_pyarrow_table_from_path(path, **kwargs) | ||||||
table_name = table_name or util.gen_name("read_parquet") | ||||||
self.create_table(table_name, table) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the |
||||||
return self.table(table_name) | ||||||
|
||||||
def _get_pyarrow_table_from_path(self, path: str | Path, **kwargs) -> pa.Table: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why can't the implementation of this just be: return pq.read_table(path, **kwargs) Did you try that already? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried that in my first commit, it cannot handle all the cases: such as glob pattern and Parquet files hosted on some uri: i.e HTTPS SFTP Pyarrow implements natively the following filesystem subclasses: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cpcloud does this make sense to you? |
||||||
import pyarrow.parquet as pq | ||||||
|
||||||
path = str(path) | ||||||
# handle url | ||||||
if util.is_url(path): | ||||||
import fsspec | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||
|
||||||
credentials = kwargs.pop("credentials", {}) | ||||||
with fsspec.open(path, **credentials) as f: | ||||||
with BytesIO(f.read()) as reader: | ||||||
return pq.read_table(reader) | ||||||
|
||||||
# handle fsspec compatible url | ||||||
if util.is_fsspec_url(path): | ||||||
return pq.read_table(path, **kwargs) | ||||||
|
||||||
# Handle local file paths or patterns | ||||||
paths = glob.glob(path) | ||||||
if not paths: | ||||||
raise ValueError(f"No files found matching pattern: {path!r}") | ||||||
elif len(paths) == 1: | ||||||
paths = paths[0] | ||||||
|
||||||
return pq.read_table(paths, **kwargs) | ||||||
|
||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should reconsider handling all of these cases -- this sort of branching logic means that when a user reports an error, we'll have any number of possible culprits to consider, and it makes it harder to debug for everyone. I think (and I could be wrong) that nearly all of these cases are covered by
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||
def _transpile_sql(self, query: str, *, dialect: str | None = None) -> str: | ||||||
# only transpile if dialect was passed | ||||||
if dialect is None: | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of BytesIO, I could pass the fsspec object, It could be HTTPFile if we pass an HTTP url. Not sure what is the best way to handle the type of
path
@gforsyth any suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think
fsspec
is a good option.