Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Acero cannot join large tables because of a misspecified test #37655

Closed
oliviermeslin opened this issue Sep 11, 2023 · 11 comments
Closed

Comments

@oliviermeslin
Copy link

oliviermeslin commented Sep 11, 2023

TL;DR: PR 35087 introduced an explicit fail in large joins with Acero when key data is larger than 4GB (solving the problem reported by issue 34474). However, I think (but I'm not sure) that this quick fix is too restrictive because the total size condition is applied to the total size of tables to be joined, rather than to the size of keys. As a consequence, Acero fails when trying to merge large tables, even when the size of key data is well below 4 GB.


EDIT: It looks like this bug is actually related to the size of the right table, but unrelated to the size of the left table (see here).

In this issue I proceed in four steps. First, I show the test introduced by PR 35087 erroneously applies to the total size of data processed in the join, rather than the size of the key data. Second, I try to find the root cause of this behavior. Third, I discuss whether this is really a bug, or the expected behavior of Acero. Fourth, I suggest the sketch of a solution.

Description of the problem

PR 35087 introduced an explicit fail in large joins with Acero when key data is larger than 4GB (here). However, I discovered that this error message does disappear when I reduce the number of columns in the Arrow Tables I'm trying to merge.

In the following reproducible example, I generate a large Arrow Table and merge it with itself, increasing the number of columns in the right table. The merge works fine for a limited number of columns, then the error message pops up when the number of columns reaches a threshold (8 in my case).

As a consequence, Acero throws an error whenever I try to merge large Arrow Tables, even for tables with key data significantly smaller than 4 GB.

Reproducible example in R

library(stringi)
library(arrow)
library(dplyr)


# Generate a large number of rows with ONE heavy key column
n_rows <- 2e7
length_id <- 20
ids <- stringi::stri_rand_strings(n_rows, length = length_id)

# Create a large Arrow Table with heavy payloads
data <- data.frame(
  id = ids
) |> 
  as_arrow_table() |>
  mutate(
    # Create payload variables
    variable1   = id,
    variable2   = id,
    variable3   = id,
    variable4   = id,
    variable5   = id,
    variable6   = id,
    variable7   = id,
    variable8   = id,
    variable9   = id,
    variable10  = id
  ) |>
  compute()


vars <- names(data)[!names(data) %in% c("id")]
nb_var <- length(vars)

# Join the dataset with itself, with an increasing number of variables
lapply(1:nb_var, function(n) {
  print(paste0("Doing the join with ", n+1, " variables"))
  vars_temp <- c("id", vars[1:n])
  print(vars_temp)
  data_out <- data |> 
    left_join(
      data |>
        select(all_of(vars_temp)),
      by = c("id" = "id")
    ) |>
    compute()
  return("Success!")
})

Cause of the problem

I dived in the C++ source code of Acero to understand the problem. Disclaimer: I do not know anything about C++, so my report might be messy from here on.

PR 35087 introduced a logical test in Status RowArrayMerge::PrepareForMerge(). This test computes what I understand to be the size of the sources (here).

I think the problem comes from the fact that the Status RowArrayMerge::PrepareForMerge() is called twice in SwissTableForJoinBuild::PreparePrtnMerge(): once for the keys (here) and once for payload variables (here). My intuition is that, when applied to payload variables, the logical test actually computes the size of the payload variables, so more or less the size of the tables to be joined. EDIT: It looks like this bug is actually related to the size of the right table, but unrelated to the size of the left table (see here).

Is this really a bug?

Given that I don't know how Acero performs joins, I'm not entirely sure whether the 4 GB size limit mentioned by @westonpace in this message applies to the size of the keys or to the size of the tables to be joined. My understanding of the discussion in the issue and of the error message is that the size limit applies to keys, so the behavior I describe should be considered as a bug. But maybe I misunderstood, and the size limit applies to table size, so the behavior I describe should be considered as the expected one.

In other words: what is exactly the issue with Acero? It cannot join heavy tables, or it cannot join tables with heavy keys?

Suggestion of solution

If the behavior I describe is an actual bug, a potential solution could look like this:

  • Introducing an additional boolean argument in RowArrayMerge::PrepareForMerge(). If this argument is TRUE, then the logical test (here) would be performed. If FALSE, it would not be performed;
  • This additional argument would be set to TRUE in the first call to RowArrayMerge::PrepareForMerge() (here);
  • This additional argument would be set to FALSE in the second call to RowArrayMerge::PrepareForMerge() (here).

With the help of Chat-GPT, I opened a PR suggesting an (untested) solution (here).

Component(s)

C++

@oliviermeslin oliviermeslin changed the title [C++] PR #35087 makes merging large tables impossible even when key data size is not an issue [C++] Acero wrongly fails when merging large tables Sep 13, 2023
oliviermeslin added a commit to oliviermeslin/arrow that referenced this issue Sep 13, 2023
[PR 35087](apache#35087) introduced an explicit fail in large joins with Acero when key data is larger than 4GB (solving the problem reported by [issue 34474](apache#34474)). However, I think (but I'm not sure) that this quick fix is too restrictive because the total size condition is applied to the total size of tables to be joined, rather than to the size of keys. As a consequence, Acero fails when trying to merge large tables, even when the size of key data is well below 4 GB.

This PR modifies the source code so that the logical test only verifies whether the total size of _key variable_ is below 4 GB.
@oliviermeslin oliviermeslin changed the title [C++] Acero wrongly fails when merging large tables [C++] Acero cannot merge large tables because of a misspecified test Sep 22, 2023
@oliviermeslin oliviermeslin changed the title [C++] Acero cannot merge large tables because of a misspecified test [C++] Acero cannot join large tables because of a misspecified test Sep 22, 2023
@jorisvandenbossche jorisvandenbossche added this to the 14.0.0 milestone Sep 25, 2023
@vkhodygo
Copy link

vkhodygo commented Oct 9, 2023

I wonder if it's possible to add some heuristics to improve this even further. Say, you have a columns with two or more long strings. Mapping it from string to char would decrease the amount of memory requested noticeably. For the case of two strings only we could go even further and map the values to bool.

@oliviermeslin
Copy link
Author

oliviermeslin commented Oct 10, 2023

@vkhodygo : I'm not sure what you mean by "improving this further".

  • If you mean that the example I provide is very rough and could be improved, you are very right, I just tried to find an easy example to reproduce the bug.
  • If you mean that working on data types could be a solution, I'm really not sure: it would clearly help in some cases, but you would still have the same problem if the datasets to be merged are large enough. For instance, in my daily use case, I have to merge datasets with 20M-50M observations and 100+ columns, so the 4GB is very much binding, even after improving data types.

@vkhodygo
Copy link

It's the latter. I know how you feel, dealing with TBs of data can be pretty annoying. However, resolving this issue might take some time whereas many people would benefit from a fix right now.

I did have another workaround for some of my data:

  • group by keys
  • save as parquet
  • load and merge in batches

This is a very crude version of what devs suggested and it seems to be working nicely.

@raulcd
Copy link
Member

raulcd commented Oct 10, 2023

@pitrou @jorisvandenbossche should this be part of 14.0.0?

@pitrou
Copy link
Member

pitrou commented Oct 10, 2023

@raulcd Only if the fix is ready.

@oliviermeslin
Copy link
Author

@vkhodygo : thanks or your quick reply. I'm not sure we are talking about the same thing. In my opinion there are actually two separate problems:

  • arrow cannot join tables where key data is heavier than 4 GB; because of the internals of Acero (this problem is explained here()). This problem is likely to be quite difficult to solve, and the solution you suggest (batch processing) is probably the best we can do in the short term. But this is not the bug I think I found.
  • PR 35087 introduced a test to check whether key data is larger than 4 GB, and this test is erroneously applied to the size of the tables to be joined. This is the bug I found, and it looks like this bug could be easily fixed (see my PR).

I argue that solving this second problem would be a significant improvement over the current situation (even if the first problem remains), because I suspect that there are many use cases where tables are larger than 4 GB but key data is not.

@oliviermeslin
Copy link
Author

oliviermeslin commented Oct 10, 2023

@pitrou @raulcd @jorisvandenbossche : Is there anything I could do to help? I can try to test this fix using artificial data, would that help?

@pitrou
Copy link
Member

pitrou commented Oct 10, 2023

@pitrou @raulcd : Is there anything I could do to help? I can try to test this fix using artificial data, would that help?

Sorry. I posted comments on the PR; I now agree that it seems difficult to test efficiently.

@oliviermeslin
Copy link
Author

oliviermeslin commented Oct 16, 2023

After some additional test, I discovered that this bug is actually related to the size of the right table, but insensitive to the size of the left table. So the bug is that the 4 GB key size test is applied to the size of the complete right table.

Here are some additional tests to show this asymmetrical behavior.

library(stringi)
library(arrow)
library(dplyr)


# Generate a large number of rows with ONE heavy key column
n_rows <- 2e7
length_id <- 20
ids <- stringi::stri_rand_strings(n_rows, length = length_id)

# Create a large Arrow Table with heavy payloads
data <- data.frame(
  id = ids
) |> 
  as_arrow_table() |>
  mutate(
    # Create payload variables
    variable1   = id,
    variable2   = id,
    variable3   = id,
    variable4   = id,
    variable5   = id,
    variable6   = id,
    variable7   = id,
    variable8   = id,
    variable9   = id,
    variable10  = id
  ) |>
  compute()

# Doing the join with a heavy left table and a light right table: it works
print("Doing the join with a heavy left table and a light right table")
data_out <- data |> 
  left_join(
    data |>
      select(all_of(c("id", "variable1"))),
    by = c("id" = "id")
  ) |>
  compute()
print("Success!")

# Doing the join with a light left table and a heavy right table: it doesn't work
print("Doing the join with a light left table and a heavy right table")
data_out <- data |>
  select(all_of(c("id", "variable1"))) |> 
  left_join(
    data,
    by = c("id" = "id")
  ) |>
  compute()
print("Success!")

@raulcd raulcd modified the milestones: 14.0.0, 15.0.0 Oct 17, 2023
@raulcd raulcd modified the milestones: 15.0.0, 16.0.0 Jan 8, 2024
@raulcd raulcd modified the milestones: 16.0.0, 17.0.0 Apr 8, 2024
skysyzygy added a commit to brooklynacademyofmusic/tessistream that referenced this issue Jun 9, 2024
@raulcd raulcd removed this from the 17.0.0 milestone Jun 28, 2024
@rafapereirabr
Copy link

Hey all! I'm facing the same issue. Looking forward to seeing this issue fixed. Just wanted to share that, in the meantime, I'm converting the data to arrow::to_duckdb() and it works quite well. Code below

# Create a large Arrow Table with heavy payloads
data <- data.frame(
  id = ids
) |> 
  as_arrow_table() |>
  mutate(
    # Create payload variables
    variable1   = id,
    variable2   = id,
    variable3   = id,
    variable4   = id,
    variable5   = id,
    variable6   = id,
    variable7   = id,
    variable8   = id,
    variable9   = id,
    variable10  = id
  ) |>
  compute()

data <- arrow::to_duckdb(data)
vars <- colnames(data)[!colnames(data) %in% c("id")]

@zanmato1984
Copy link
Contributor

This issue should be resolved by #43389 so I'm closing it. Feel free to try and give us your feedback. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants