Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broken filter for newly created delta table #2169

Closed
Hanspagh opened this issue Feb 6, 2024 · 15 comments · Fixed by #2172
Closed

Broken filter for newly created delta table #2169

Hanspagh opened this issue Feb 6, 2024 · 15 comments · Fixed by #2172
Labels
bug Something isn't working

Comments

@Hanspagh
Copy link

Hanspagh commented Feb 6, 2024

Environment

Delta-rs version:
'0.15.2'

Binding:
python

Environment:

  • OS: Mac

Bug

When creating a new delta table from a pandas dataframe, it appears that the filter predicate is broken for some expression

What happened:
.to_pandas() and .to_pyarrow_dataset() return 0 data

What you expected to happen:
The above functions should return the data reflected in the filter predicate

How to reproduce it:
This is a large dataset that I cannot share, but please point me in any directions for how to debug this.

This is how I achieved my current results

from pyarrow import dataset as ds
from pyarrow.parquet import ParquetFile, ParquetDataset
import pyarrow.compute as pc
from deltalake import write_deltalake, convert_to_deltalake, DeltaTable
import pandas as pd

df = "some_internal_data"

write_deltalake("broken_nlrot.delta", df)

dt_broken = DeltaTable("broken_nlrot.delta")
dt_broken.to_pyarrow_dataset().to_table(filter=(pc.field("terminal") == "NLROTTM")).shape
# returns (0, 19)

# Trying to read strait with pyarrow
ds.dataset("broken_nlrot.delta").to_table(filter=(pc.field("terminal") == "NLROTTM")).shape
# returns (773536, 19) which is the expected result

#similarly to_pandas also does not seem to work 
dt_broken.to_pandas(filters=[("terminal", "==", "NLROTTM")]).shape 
# returns (0,19)

# again with pyarrow we are fine
ParquetDataset("broken_nlrot.delta", filters=[("terminal", "==", "NLROTTM")]).read().shape
# returns (773536, 19) which is the expected result

#oddly enough it seems to work when I choose a different filter predicate I dont get zero rows, but still wrong resulst
dt_broken.to_pandas(filters=[("terminal", "==", "USSAVGC")]).shape
# (89406, 19) - Wrong

ds.dataset("broken_nlrot.delta").to_table(filter=(pc.field("terminal") == "USSAVGC")).shape
# (420029, 19) - Correct

# pulling the full data in pandas also seems to work
full = dt_broken.to_pandas()
full[full["terminal"] == "NLROTTM"].shape
# (773536, 19)

full[full["terminal"] == "USSAVGC"].shape
# (420029, 19)


# calling optimize seem to fix the problem, but this should not be needed in order for filters to work
dt.optimize.z_order(["terminal"])
dt_broken.to_pyarrow_dataset().to_table(filter=(pc.field("terminal") == "NLROTTM")).shape
# returns (773536, 19)

Since this is returning partially correct results, I suspect maybe some row_group statistics being wrong, but then I would assume the calls from pyarrow would also return incorrect results

@Hanspagh Hanspagh added the bug Something isn't working label Feb 6, 2024
@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Feb 6, 2024

@Hanspagh can you please check the following things:

  • try write_deltalake(engine='rust') since this eliminates pyarrow from the equation (also please share the pyarrow version you use now)
  • try deltalake v0.15.1 or v0.15.0

Also, it would really help if you can mimic the structure of the data with fake/sample data so we can try to reproduce, the only logical thing I can think of for now is the partition expression is incorrect

@Hanspagh
Copy link
Author

Hanspagh commented Feb 6, 2024 via email

@Hanspagh
Copy link
Author

Hanspagh commented Feb 6, 2024

Okay, so this seem to be related to pyarrow since engine="rust" fixes this.
Currently, I am using Pyarrow 13.0.0, will play around with the pyarrow/deltalake versions

@Hanspagh
Copy link
Author

Hanspagh commented Feb 6, 2024

So I managed to reproduce this. It only happens with large dataset 10_485_761 seems to be the magic number, I tried both with pyarrow 15, 13, 12 10, 9. With pyarrow 8 the procces seems to hang when I try to save a frame this big.

It looks as the filter overflows and only returns the rows that large than 10_485_760, since we get 1 for 10_485_761 and 2 for 10_485_762

10_485_760 is also oddly close to 1024**2 = 1_048_576

I hope this helps to figure out what is going on here. Let me know if you want me to provide more details

df = pd.DataFrame({"data": ["B"] * 10_485_760 })
write_deltalake("sample.delta", df, mode="overwrite")
dt_broken = DeltaTable("sample.delta")
dt_broken.to_pyarrow_dataset().to_table(filter=(pc.field("data") == "B")).shape
# (10485760, 1)

df = pd.DataFrame({"data": ["B"] * 10_485_761 })
write_deltalake("sample.delta", df, mode="overwrite")
dt_broken = DeltaTable("sample.delta")
dt_broken.to_pyarrow_dataset().to_table(filter=(pc.field("data") == "B")).shape
# (1, 1)

df = pd.DataFrame({"data": ["B"] * 10_485_762 })
write_deltalake("sample.delta", df, mode="overwrite")
dt_broken = DeltaTable("sample.delta")
dt_broken.to_pyarrow_dataset().to_table(filter=(pc.field("data") == "B")).shape
# (2, 1)

@Hanspagh
Copy link
Author

Hanspagh commented Feb 6, 2024

I found the magic number, it comes from the default of max_rows_per_file in write_deltalake, which is set to 1024102410, I suggest this to be aligned with pyarrow where the default is None/0.
Properly min_rows_per_group, max_rows_per_group should be aligned as well.

max_rows_per_file: int = 10 * 1024 * 1024,

The limit forces pyarrow to split the parquet in two and it seems like deltalake then ignores all but the last of those split files

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Feb 6, 2024

@Hanspagh there seems to be an issue with the creation of the pyarrow.dataset when data there are multiple parquets. I can write tables with v0.15.2 and then read them with v0.15.1 with the pc.field("data")=="B" expression.

v0.15.2 gives this fragment expression:

0-e67ec940-2288-4e27-90a5-ba1a07745685-0.parquet ((data >= null[string]) and (data <= null[string]))

While v0.15.1 gave 0-e67ec940-2288-4e27-90a5-ba1a07745685-0.parquet None

@Hanspagh
Copy link
Author

Hanspagh commented Feb 6, 2024

You are right this is only a problem in 0.15.2

Also 0.15.2 seems to printing some debugging info

partition_values: {}
path: "0-14c7a12a-ee21-4fc6-b8a4-ea79c9a45c13-1.parquet"
partition_values: {}
path: "0-14c7a12a-ee21-4fc6-b8a4-ea79c9a45c13-0.parquet"

@Hanspagh
Copy link
Author

Hanspagh commented Feb 6, 2024

Hmm, but it does not seem to be strictly related to number of files

this works fine

df = pd.DataFrame({"data": ["B"] * 10 })
write_deltalake("broken.delta", df, max_rows_per_file=2, max_rows_per_group=2, min_rows_per_group=2)
# 5 output files
dt_broken = DeltaTable("/Users/hans.pagh/Downloads/broken.delta")
dt_broken.to_pyarrow_dataset().to_table(filter=(pc.field("data") == "B")).shape
# (10,1)

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Feb 6, 2024

@Hanspagh I see the issue, the stats are empty on the add action for one of the files, will have to check why they are empty now and not before : )

Edit:
Actually they were missing before as well, it's that now this results in a part expression which thinks there are null values 😕

@Hanspagh
Copy link
Author

Hanspagh commented Feb 6, 2024

This seems to one of the smaller examples, where it is broken

df = pd.DataFrame({"data": ["B"] * 1024 * 33 })
write_deltalake("broken.delta", df, max_rows_per_file=1024*32,max_rows_per_group=1024 * 16, min_rows_per_group=8*1024, mode="overwrite")

This seems fine, so there is something with the max_rows_per_file

df = pd.DataFrame({"data": ["B"] * 1024 * 33 })
write_deltalake("broken.delta", df, max_rows_per_file=1024*31,max_rows_per_group=1024 * 16, min_rows_per_group=8*1024, mode="overwrite")

@ion-elgreco
Copy link
Collaborator

@Hanspagh found the culprit, there seems to be an empty row group in the parquet. Our function get_file_stats_from_metadata is checking whether the stats are set for each row group, but in this case the last row group is empty and has no stats set, so it's skipping to set stats

@Hanspagh
Copy link
Author

Hanspagh commented Feb 6, 2024

Great find, really great to see this could be solved so fast. :)

Unrelated to this, deltalake seems to create more row_groups than the pyarrow where the row limit per group is set to 1mill, is there a specific reason for this?

@ion-elgreco
Copy link
Collaborator

@Hanspagh you mean the rust engine?

@Hanspagh
Copy link
Author

Hanspagh commented Feb 6, 2024

No, your default settings for max_rows_per_group is 128*1024, where for pyarrow it is 1024*1024, it was my understanding that having "too small" row_groups was not ideal.

I also suspect that the min_rows_per_group default is the reason for empty row_group?

pyarrow defaults
https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html

@ion-elgreco
Copy link
Collaborator

@Hanspagh not sure, I think they originated from some defaults databricks does with spark-delta.

Fix is incoming btw

ion-elgreco added a commit that referenced this issue Feb 6, 2024
# Description
For some odd reason the pyarrow parquet writer will leave empty row
groups in the parquet file when it hits the max_row limit that's passed.
While grabbing the stats we were checking if all row_groups were having
stats added to them but these empty row groups had no stats so it causes
the whole file add action to get no stats recorded.

We now skip empty row groups while gathering the stats to prevent this.

In v0.15.2 we now also evaluate files with no stats mentioned as null
@roeap @rtyler not sure if this is entirely correct as well

# Related Issue(s)
- closes #2169

---------

Co-authored-by: Will Jones <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants