Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] ds.dataset cannot filter on hive-style partitions created with org.apache.spark.version: '3.4.1' #37802

Closed
lmocsi opened this issue Sep 20, 2023 · 12 comments

Comments

@lmocsi
Copy link

lmocsi commented Sep 20, 2023

Describe the bug, including details regarding any error messages, version, and platform.

import polars as pl
import pyarrow.dataset as ds
df = pl.scan_pyarrow_dataset(ds.dataset(parq_path+filename, partitioning='hive'))
df.filter(pl.col('partition_column') == 'value').head(5).collect()

Not sure if this belongs to polars or pyarrow.
If I run the above code on a hive-partitioned parquet file created with org.apache.spark.version: '3.4.0' it runs fine.
If I run it on a file (having 8 simple columns) created with org.apache.spark.version: '3.4.1', it runs out of 32 GB memory.

Component(s)

Parquet, Python

@lmocsi
Copy link
Author

lmocsi commented Sep 21, 2023

It seems, that the issue is not with pyspark version, but with something other. :(
(Must be some issue with the data inside.)

@AlenkaF
Copy link
Member

AlenkaF commented Sep 26, 2023

It is hard to debug issues without a reproducible example.

Does the filtering in polars give the issue or reading the dataset in pyarrow? That is, if you load the dataset only with pyarrow without using polars (ds.dataset(parq_path+filename, partitioning='hive')), do you also have an issue with memory?

You can also inspect the schema of the two different datasets created with different versions of Apache Spark, see https://arrow.apache.org/docs/python/dataset.html#dataset-discovery. Maybe you will be able to find the difference?

@AlenkaF
Copy link
Member

AlenkaF commented Sep 26, 2023

You can also inspect the schema of the two different datasets created with different versions of Apache Spark, see https://arrow.apache.org/docs/python/dataset.html#dataset-discovery. Maybe you will be able to find the difference?

Oh, you mentioned in a latter comment that pyspark version is not an issue. What exactly is the issue then? Do you run out of memory in any case (no matter which version of pyspark you are using)?

@lmocsi
Copy link
Author

lmocsi commented Sep 27, 2023

Yes. The parquet dataset is 8.5 GB and upon reading it up, I run out of 32 GB of ram.
Though, I'm filtering on the partition key. :(
I'm using polars 0.19.3 to read the data. The hive-partitioned dataset was created with pyspark 3.4.1.

@lmocsi
Copy link
Author

lmocsi commented Sep 27, 2023

The bug seems to be in the to_polars() / to_pandas() method (both result in running out of memory):

Traceback (most recent call last):
File "/tmp/1000920000/ipykernel_14253/1035655516.py", line 1, in
dataset.to_table(filter=ds.field('MONTH_CODE') == 'M999912').to_polars().head()
File "pyarrow/_dataset.pyx", line 556, in pyarrow._dataset.Dataset.to_table
File "pyarrow/_dataset.pyx", line 3638, in pyarrow._dataset.Scanner.to_table
File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 117, in pyarrow.lib.check_status
pyarrow.lib.ArrowMemoryError: malloc of size 201326592 failed

The M999912 partition is less than 6 GB on disk (in parquet).

import pyarrow.dataset as ds
dataset = ds.dataset(path_to_dir, format='parquet', partitioning='hive')
dataset.to_table(filter=ds.field('MONTH_CODE') == 'M999912').to_polars().head()

@jorisvandenbossche
Copy link
Member

If you leave out the to_polars().head() (only run the to_table()) in the example above, do you then have the same issue? The to_table method already reads everything into memory, so it would be strange that this would not yet fail when conversion to polars (which should be mostly zero-copy) fails.

Could you show the schema of the dataset?

@lmocsi
Copy link
Author

lmocsi commented Sep 29, 2023

Leaving out the to_polars().head() shows the same behaviour: runs out of memory.
Schema is this:
EFFECTIVE_START_DATE: timestamp[ns]
EFFECTIVE_END_DATE: timestamp[ns]
VALID_FROM: timestamp[ns]
VALID_TO: timestamp[ns]
AB_PART_PARTY_ID: int64
AD_STTY_ID: int64
VALUE_DICT_ID: int64
MONTH_CODE: string
-- schema metadata --
org.apache.spark.timeZone: 'Europe/Budapest'
org.apache.spark.legacyINT96: ''
org.apache.spark.version: '3.4.0'
org.apache.spark.sql.parquet.row.metadata: '{"type":"struct","fields":[{"' + 569

Number of records should be 426694231 in that partition.

@jorisvandenbossche
Copy link
Member

Can you do table = dataset.head(100_000, filter=ds.field('MONTH_CODE') == 'M999912') to see if reading the first X rows works OK? And if that works OK, what's the size of this part of the data in memory? (table.nbytes)

@lmocsi
Copy link
Author

lmocsi commented Sep 29, 2023

Reading the first 100000 records works fine.
table.nbytes returns 6762500

@jorisvandenbossche
Copy link
Member

Extrapolating that to the full size of the file would give around 27GB in memory. Are you sure you have enough memory? You mentioned 32GB, but there might be other programs running also requiring memory?

@lmocsi
Copy link
Author

lmocsi commented Sep 30, 2023

Can be.
But is the head() command not pushed down, so that not all data is read up in to_polars().head()?

@lmocsi
Copy link
Author

lmocsi commented Feb 21, 2024

Closing this, because not really reproducible, and not sure what was causing the issue (polars or pyarrow).

@lmocsi lmocsi closed this as completed Feb 21, 2024
@kou kou changed the title ds.dataset cannot filter on hive-style partitions created with org.apache.spark.version: '3.4.1' [Python] ds.dataset cannot filter on hive-style partitions created with org.apache.spark.version: '3.4.1' Feb 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants