Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Table.join() produces incorrect results for large inputs #34474

Closed
ericlin4 opened this issue Mar 6, 2023 · 3 comments · Fixed by #35087
Closed

[Python] Table.join() produces incorrect results for large inputs #34474

ericlin4 opened this issue Mar 6, 2023 · 3 comments · Fixed by #35087
Assignees
Labels
Component: Python Critical Fix Bugfixes for security vulnerabilities, crashes, or invalid data. Priority: Blocker Marks a blocker for the release Priority: Critical Type: bug
Milestone

Comments

@ericlin4
Copy link

ericlin4 commented Mar 6, 2023

Describe the bug, including details regarding any error messages, version, and platform.

Pyarrow's join does not produce the same results as Pandas when the input tables are large. I am observing this in industry data that I am working with, and I have a reproducible example below that mimics this data.

In this example, we have 72 million unique rows in each table with 9 join key columns of various types. The tables are identical except for the 'val' column in the second table.

Pyarrow's join creates null values for 'val' where there should be actual values from the second table. The join performed in
Pandas produces the expected result.

I can produce the same result as Pandas by splitting each table into pieces, joining each left piece to each right piece, coalescing 'val', and concatenating the outputs (e.g., pa.concat([tbl1_a.join(tbl2_a).join(tbl2_b), tbl1_b.join(tbl2_a).join(tbl2_b)])).

Apologies for the long-running example. The first section that generates the join key data takes about an hour on my machine (AWS r5.24xlarge EC2 instance) with the rest taking about 30 minutes. Around 100GB of memory is necessary to run the code.

import pyarrow as pa #11.0.0
import pandas as pd  #1.5.3
import numpy as np   #1.23.5

#Generate join key data
n_rows = 72000000
join_keys = [f'col{i}' for i in range(1,10)]

col_str = [str(i) for i in range(n_rows)]
col_date = [pd.to_datetime('2000-01-01') for i in range(n_rows)]
col_int = [i for i in range(n_rows)]

#Create dataframes -- df1 and df2 are identical except for the the 'val' column
df1 = pd.DataFrame({'col1': col_str,
                    'col2': col_str,
                    'col3': col_str,
                    'col4': col_str,
                    'col5': col_str,
                    'col6': col_date,
                    'col7': col_date,
                    'col8': col_date,
                    'col9': col_int})

df2 = pd.DataFrame({'col1': col_str,
                    'col2': col_str,
                    'col3': col_str,
                    'col4': col_str,
                    'col5': col_str,
                    'col6': col_date,
                    'col7': col_date,
                    'col8': col_date,
                    'col9': col_int,
                    'val': [i for i in range(n_rows - 10000000)] + [np.nan for i in range(10000000)]})

#Create Pyarrow Tables and merge
df1_pa = pa.Table.from_pandas(df1)
df2_pa = pa.Table.from_pandas(df2)

merge_pa = df1_pa.join(df2_pa, keys = join_keys, join_type = 'left outer')
merge_pa_df = merge_pa.to_pandas()

#Merge dataframes analogously in Pandas
merge_pd_df = pd.merge(df1, df2, on = join_keys, how = 'left')

#Compare results -- should have the same number of non-null values in 'val'
print(f"Pyarrow join non-null rows: {sum(merge_pa_df['val'].notnull())}")
print(f"Pandas merge non-null rows: {sum(merge_pd_df['val'].notnull())}")

#Returns
#"Pyarrow join non-null rows: 37317087" (also changes from run to run)
#"Pandas merge non-null rows: 62000000"

#Expected
#"Pyarrow join non-null rows: 62000000"
#"Pandas merge non-null rows: 62000000"



#Example row of unexpected output
#merge_pd_df.rename({'val':'val_pd'}, axis = 1, inplace = True)
#merge_pa_df.rename({'val':'val_pa'}, axis = 1, inplace = True)
#comp = pd.merge(merge_pd_df, merge_pa_df, on = join_keys, how = 'left')

#col1   col2   col3   col4   col5   col6       col7       col8       col9   val_pd   val_pa
#0      0      0      0      0      2000-01-01 2000-01-01 2000-01-01 0      0.0      NaN

Component(s)

Python

@ericlin4 ericlin4 changed the title Table.join() produces incorrect results for large inputs [Python] Table.join() produces incorrect results for large inputs Mar 6, 2023
@westonpace
Copy link
Member

Thank you very much for the report!

#"Pyarrow join non-null rows: 37317087" (also changes from run to run)

Hmm, there isn't much that is non-deterministic in a hash-join. So my guess would be that this is some sort of race condition. Perhaps we are scheduling more tasks at the higher size and that is leading to the issue. I was able to come up with a reproducer that runs in under a minute and should be runnable with 32GB of RAM:

import pyarrow as pa #11.0.0                                                                                                                                                                                       
import pandas as pd  #1.5.3                                                                                                                                                                                        
import numpy as np   #1.23.5                                                                                                                                                                                       
import pyarrow.compute as pc

#Generate join key data                                                                                                                                                                                            
# n_rows = 72_000_000                                                                                                                                                                                              
n_rows = 72_000_000
n_nan_rows = 10_000_000
join_keys = [f'col{i}' for i in range(1,10)]

some_date = pd.to_datetime('2000-01-01')
col_date = pa.array([some_date for i in range(n_rows)])
col_int = pa.array([i for i in range(n_rows)])
col_str = pc.cast(col_int, pa.string())

#Create dataframes -- df1 and df2 are identical except for the the 'val' column                                                                                                                                    
df1_pa = pa.Table.from_pydict({'col1': col_str,
                    'col2': col_str,
                    'col3': col_str,
                    'col4': col_str,
                    'col5': col_str,
                    'col6': col_date,
                    'col7': col_date,
                    'col8': col_date,
                    'col9': col_int})
print(f'left nbytes: {df1_pa.nbytes}')

values = pa.array([i for i in range(n_rows - n_nan_rows)] + [np.nan for i in range(n_nan_rows)])

df2_pa = pa.Table.from_pydict({'col1': col_str,
                    'col2': col_str,
                    'col3': col_str,
                    'col4': col_str,
                    'col5': col_str,
                    'col6': col_date,
                    'col7': col_date,
                    'col8': col_date,
                    'col9': col_int,
                            'val': values})
print(f'right nbytes: {df2_pa.nbytes}')

merge_pa = df1_pa.join(df2_pa, keys = join_keys, join_type = 'left outer')
vals_merged = merge_pa.column('val')

non_null_count = pc.count(merge_pa.column('val'))
nan_count = len(vals_merged.filter(pc.is_nan(vals_merged)))
print(f'Expected: {n_nan_rows} Actual: {nan_count}')

This will be a tricky one to get to the bottom of I think.

@westonpace
Copy link
Member

Ok, so it seems the non-determinism is from garbage memory and not threading. This code triggers a segmentation fault when run in debug mode. The error is somewhere in the hash-table and that code is pretty complex. That's about as far as I can get today but I'll try and find a day to really dive into this before the release. This needs to be fixed.

For future reference, I'm attaching the stack trace I am getting.
snippet.txt

@westonpace westonpace added Priority: Critical Critical Fix Bugfixes for security vulnerabilities, crashes, or invalid data. labels Mar 7, 2023
@jorisvandenbossche jorisvandenbossche added this to the 12.0.0 milestone Apr 6, 2023
@westonpace westonpace added the Priority: Blocker Marks a blocker for the release label Apr 11, 2023
@westonpace
Copy link
Member

westonpace commented Apr 12, 2023

I managed to look into this today. The bad news is that this join isn't supported.

There are 9 key columns. The date and int columns are 8 bytes each. The string columns are variable but at least 4 bytes and average out close enough to 8 bytes that we can just use 8.

72,000,000 * 8 bytes * 9 columns ~ 5GB of data. We store key data in a structure that we index with uint32_t which means we can have at most 4GiB of key data.

The current behavior is that we trigger an overflow and clobber existing data in our keys array which is leading to the results you are seeing (incorrect data).

I'm working on a fix that will detect this condition and fail the join when it encounters more than 4GiB key data. My guess is that by implementing hash join spilling (e.g. #13669) we would naturally increase this limit. Until then the best we can do is fail.

jorisvandenbossche pushed a commit that referenced this issue Apr 13, 2023
…h key data (#35087)

### Rationale for this change

This fixes the test in #34474 though there are likely still other bad scenarios with large joins.  I've fixed this one since the behavior (invalid data) is particularly bad.  Most of the time if there is too much data I'm guessing we probably just crash.  Still, I think a test suite of some kind stressing large joins would be good to have.  Perhaps this could be added if someone finds time to work on join spilling.

### What changes are included in this PR?

If the join will require more than 4GiB of key data it should now return an invalid status instead of invalid data.

### Are these changes tested?

No.  I created a unit test but it requires over 16GiB of RAM (Besides the input data itself (4GiB), by the time you get 4GiB of key data there are various other join state buffers that also grow.  The test also took nearly a minute to run.  I think investigation and creation of a test suite for large joins is probably a standalone effort.

### Are there any user-facing changes?

No.
* Closes: #34474

Authored-by: Weston Pace <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
@jorisvandenbossche jorisvandenbossche modified the milestones: 12.0.0, 13.0.0 Apr 13, 2023
raulcd pushed a commit that referenced this issue Apr 17, 2023
…h key data (#35087)

### Rationale for this change

This fixes the test in #34474 though there are likely still other bad scenarios with large joins.  I've fixed this one since the behavior (invalid data) is particularly bad.  Most of the time if there is too much data I'm guessing we probably just crash.  Still, I think a test suite of some kind stressing large joins would be good to have.  Perhaps this could be added if someone finds time to work on join spilling.

### What changes are included in this PR?

If the join will require more than 4GiB of key data it should now return an invalid status instead of invalid data.

### Are these changes tested?

No.  I created a unit test but it requires over 16GiB of RAM (Besides the input data itself (4GiB), by the time you get 4GiB of key data there are various other join state buffers that also grow.  The test also took nearly a minute to run.  I think investigation and creation of a test suite for large joins is probably a standalone effort.

### Are there any user-facing changes?

No.
* Closes: #34474

Authored-by: Weston Pace <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
liujiacheng777 pushed a commit to LoongArch-Python/arrow that referenced this issue May 11, 2023
…oo much key data (apache#35087)

### Rationale for this change

This fixes the test in apache#34474 though there are likely still other bad scenarios with large joins.  I've fixed this one since the behavior (invalid data) is particularly bad.  Most of the time if there is too much data I'm guessing we probably just crash.  Still, I think a test suite of some kind stressing large joins would be good to have.  Perhaps this could be added if someone finds time to work on join spilling.

### What changes are included in this PR?

If the join will require more than 4GiB of key data it should now return an invalid status instead of invalid data.

### Are these changes tested?

No.  I created a unit test but it requires over 16GiB of RAM (Besides the input data itself (4GiB), by the time you get 4GiB of key data there are various other join state buffers that also grow.  The test also took nearly a minute to run.  I think investigation and creation of a test suite for large joins is probably a standalone effort.

### Are there any user-facing changes?

No.
* Closes: apache#34474

Authored-by: Weston Pace <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
ArgusLi pushed a commit to Bit-Quill/arrow that referenced this issue May 15, 2023
…oo much key data (apache#35087)

### Rationale for this change

This fixes the test in apache#34474 though there are likely still other bad scenarios with large joins.  I've fixed this one since the behavior (invalid data) is particularly bad.  Most of the time if there is too much data I'm guessing we probably just crash.  Still, I think a test suite of some kind stressing large joins would be good to have.  Perhaps this could be added if someone finds time to work on join spilling.

### What changes are included in this PR?

If the join will require more than 4GiB of key data it should now return an invalid status instead of invalid data.

### Are these changes tested?

No.  I created a unit test but it requires over 16GiB of RAM (Besides the input data itself (4GiB), by the time you get 4GiB of key data there are various other join state buffers that also grow.  The test also took nearly a minute to run.  I think investigation and creation of a test suite for large joins is probably a standalone effort.

### Are there any user-facing changes?

No.
* Closes: apache#34474

Authored-by: Weston Pace <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
rtpsw pushed a commit to rtpsw/arrow that referenced this issue May 16, 2023
…oo much key data (apache#35087)

### Rationale for this change

This fixes the test in apache#34474 though there are likely still other bad scenarios with large joins.  I've fixed this one since the behavior (invalid data) is particularly bad.  Most of the time if there is too much data I'm guessing we probably just crash.  Still, I think a test suite of some kind stressing large joins would be good to have.  Perhaps this could be added if someone finds time to work on join spilling.

### What changes are included in this PR?

If the join will require more than 4GiB of key data it should now return an invalid status instead of invalid data.

### Are these changes tested?

No.  I created a unit test but it requires over 16GiB of RAM (Besides the input data itself (4GiB), by the time you get 4GiB of key data there are various other join state buffers that also grow.  The test also took nearly a minute to run.  I think investigation and creation of a test suite for large joins is probably a standalone effort.

### Are there any user-facing changes?

No.
* Closes: apache#34474

Authored-by: Weston Pace <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
oliviermeslin added a commit to oliviermeslin/arrow that referenced this issue Sep 13, 2023
[PR 35087](apache#35087) introduced an explicit fail in large joins with Acero when key data is larger than 4GB (solving the problem reported by [issue 34474](apache#34474)). However, I think (but I'm not sure) that this quick fix is too restrictive because the total size condition is applied to the total size of tables to be joined, rather than to the size of keys. As a consequence, Acero fails when trying to merge large tables, even when the size of key data is well below 4 GB.

This PR modifies the source code so that the logical test only verifies whether the total size of _key variable_ is below 4 GB.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Component: Python Critical Fix Bugfixes for security vulnerabilities, crashes, or invalid data. Priority: Blocker Marks a blocker for the release Priority: Critical Type: bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants