Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Remove redundant S3 call #33972

Closed
Fokko opened this issue Feb 1, 2023 · 6 comments · Fixed by #34015
Closed

[Python] Remove redundant S3 call #33972

Fokko opened this issue Feb 1, 2023 · 6 comments · Fixed by #34015

Comments

@Fokko
Copy link
Contributor

Fokko commented Feb 1, 2023

Describe the enhancement requested

Hey all,

First of all thanks everyone for working on PyArrow! Really loving it so far. I'm currently working on PyIceberg that will load an Iceberg table in PyArrow. For those unfamiliar with Apache Iceberg. This is a table format that focusses on having huge tables (petabyte size). PyIceberg makes you life easier by taking care of statistics to boost performance, and all the schema maintenance. For example, if you change the partitioning of an Iceberg table, you don't have to directly rewrite all the files, you can do this in an incremental way.

Now I'm running into some performance issues, and I noticed that PyArrow is doing more queries than required to S3. I went down the rabbit hole, and was able to narrow it down to:

import pyarrow.dataset as ds
from pyarrow.fs import S3FileSystem
ONE_MEGABYTE = 1024 * 1024

client_kwargs = {
    "endpoint_override": "http://localhost:9000",
    "access_key": "admin",
    "secret_key": "password",
}
parquet_format = ds.ParquetFileFormat(
    use_buffered_stream=True,
    pre_buffer=True,
    buffer_size=8 * ONE_MEGABYTE
)
fs = S3FileSystem(**client_kwargs)
with fs.open_input_file("warehouse/wh/nyc/taxis/data/tpep_pickup_datetime_day=2022-04-30/00003-4-89e0ad58-fb77-4512-8679-6f26d8d6ef28-00033.parquet") as fout:
    # First get the fragment
    fragment = parquet_format.make_fragment(fout, None)
    print(f"Schema: {fragment.physical_schema}")
    arrow_table = ds.Scanner.from_fragment(
        fragment=fragment
    ).to_table()

I need the schema first, because it can be that a column got renamed, but the the file hasn't been rewritten against the latest schema. The same goes for filtering, if you change a column name, and the file still has the old name in there, then you would like to leverage the predicate pushdown of PyArrow to not load the data in memory at all.

When looking into the minio logs I can see that it does four requests.

  1. A head to check if the file exists
  2. The last 64kb from the Parquet file to get the schema
  3. Another last 64kb from the parquet file to get the schema
  4. A nice beefy 1978578kb request to fetch the data

Looking at the tests, we shouldn't fetch the footer twice:

# with default discovery, no metadata loaded
with assert_opens([fragment.path]):
    fragment.ensure_complete_metadata()
assert fragment.row_groups == [0, 1]

# second time -> use cached / no file IO
with assert_opens([]):
    fragment.ensure_complete_metadata()

Any thoughts or advice? I went through the code a bit already, but my cpp is a bit rusty

Component(s)

Python

@westonpace
Copy link
Member

The datasets feature went through considerable change a while back when it moved from a parquet-only feature to format-agnostic. Looks like this connection came loose in the conversion. If you just want to read one file the approach is normally something more like:

import pyarrow.parquet as pq
pq.read_table(path)

If you're looking to read a collection of files you would normally use:

import pyarrow.dataset as ds
ds.dataset([paths]).to_table()

I suspect (though am not entirely certain) both of the above paths will only read the metadata once.

However, your usage is legitimate, and it even affects the normal datasets path when you scan the dataset multiple times (because we should be caching the metadata on the first scan and reusing on the second). So I would consider this a bug.

I don't know for sure but my guess is the problem is here. The fragment is opening a reader and should pass the metadata to the reader, if already populated.

@westonpace
Copy link
Member

You mentioned in the other issue that you want to reuse the connection. Could you clarify a little bit to your larger goal? Or perhaps do you have some example code somewhere of how you're planning on using this?

For example, are you bringing in an S3 connection from outside of pyarrow? Or do you start with a path? Are you reading from the same dataset multiple times or is this a one-shot operation (or the list of files changes from call to call)?

@Fokko
Copy link
Contributor Author

Fokko commented Feb 2, 2023

@westonpace sure thing!

We need to make projections, and we need to have the schema before loading the data. For example, if you have an Iceberg table, and you do a rename on a column, then you don't want to rewrite your multi-petabyte table. Iceberg uses IDs to identify the column, and if you filter or project on that column, it will select the old column name in the files that are written before the rename.

The current code is over here: https://github.com/apache/iceberg/blob/master/python/pyiceberg/io/pyarrow.py#L486-L522

Fokko added a commit to Fokko/arrow that referenced this issue Feb 3, 2023
Fokko added a commit to Fokko/arrow that referenced this issue Feb 3, 2023
Fokko added a commit to Fokko/arrow that referenced this issue Feb 3, 2023
Fokko added a commit to Fokko/arrow that referenced this issue Feb 3, 2023
@westonpace
Copy link
Member

westonpace commented Feb 3, 2023

We need to make projections, and we need to have the schema before loading the data. For example, if you have an Iceberg table, and you do a rename on a column, then you don't want to rewrite your multi-petabyte table. Iceberg uses IDs to identify the column, and if you filter or project on that column, it will select the old column name in the files that are written before the rename.

Ok, that helps. In the short term I think you should use pyarrow.parquet.ParquetFile. That's a direct binding to the parquet-cpp libs and won't use any of the dataset stuff. We don't have a format-agnostic concept of "read the metadata but cache it for use later so you don't have to read it again".

Longer term, you can probably just specify a custom evolution strategy (using parquet column IDs) and let pyarrow handle the expression conversion for you. Sadly, this feature is not yet ready (I'm working on it when I can. 🤞 for 12.0.0)

Fokko added a commit to Fokko/arrow that referenced this issue Feb 3, 2023
Fokko added a commit to Fokko/arrow that referenced this issue Feb 3, 2023
Fokko added a commit to Fokko/arrow that referenced this issue Feb 3, 2023
Fokko added a commit to Fokko/arrow that referenced this issue Feb 3, 2023
@jorisvandenbossche
Copy link
Member

In the short term I think you should use pyarrow.parquet.ParquetFile.

The simple ParquetFile interface for single files doesn't support filtering row groups with a filter, so that would be a step back from using pq.read_table?

@Fokko
Copy link
Contributor Author

Fokko commented Feb 3, 2023

We don't have a format-agnostic concept of "read the metadata but cache it for use later so you don't have to read it again".

That's not a problem, as long as it keeps cached in the fragment. Because the reverse bytes to get the footer are rather expensive (in terms of time), so we would love to eliminate that call. I went through the code, and was able to pass down the metadata from the fragment down to the reader: #34015

The simple ParquetFile interface for single files doesn't support filtering row groups with a filter, so that would be a step back from using pq.read_table?

I agree, we need to have predicate pushdown 👍🏻

Longer term, you can probably just specify a custom evolution strategy (using parquet column IDs) and let pyarrow handle the expression conversion for you. Sadly, this feature is not yet ready (I'm working on it when I can. 🤞 for 12.0.0)

Let me know when something is ready, happy to test 👍🏻

Fokko added a commit to Fokko/arrow that referenced this issue Feb 3, 2023
westonpace added a commit that referenced this issue Feb 4, 2023
Closes #33972

### Rationale for this change

### What changes are included in this PR?

### Are these changes tested?

### Are there any user-facing changes?

* Closes: #33972

Lead-authored-by: Fokko Driesprong <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Co-authored-by: Fokko Driesprong <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
@westonpace westonpace added this to the 12.0.0 milestone Feb 4, 2023
sjperkins pushed a commit to sjperkins/arrow that referenced this issue Feb 10, 2023
Closes apache#33972

### Rationale for this change

### What changes are included in this PR?

### Are these changes tested?

### Are there any user-facing changes?

* Closes: apache#33972

Lead-authored-by: Fokko Driesprong <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Co-authored-by: Fokko Driesprong <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
gringasalpastor pushed a commit to gringasalpastor/arrow that referenced this issue Feb 17, 2023
Closes apache#33972

### Rationale for this change

### What changes are included in this PR?

### Are these changes tested?

### Are there any user-facing changes?

* Closes: apache#33972

Lead-authored-by: Fokko Driesprong <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Co-authored-by: Fokko Driesprong <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
fatemehp pushed a commit to fatemehp/arrow that referenced this issue Feb 24, 2023
Closes apache#33972

### Rationale for this change

### What changes are included in this PR?

### Are these changes tested?

### Are there any user-facing changes?

* Closes: apache#33972

Lead-authored-by: Fokko Driesprong <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Co-authored-by: Fokko Driesprong <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants