Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] memory efficient explode and pos_explode implementations #6151

Closed
revans2 opened this issue Sep 3, 2020 · 1 comment · Fixed by #7140 or #7376
Closed

[FEA] memory efficient explode and pos_explode implementations #6151

revans2 opened this issue Sep 3, 2020 · 1 comment · Fixed by #7140 or #7376
Assignees
Labels
feature request New feature or request libcudf Affects libcudf (C++/CUDA) code. Python Affects Python cuDF API. Spark Functionality that helps Spark RAPIDS

Comments

@revans2
Copy link
Contributor

revans2 commented Sep 3, 2020

Is your feature request related to a problem? Please describe.

#2975 asks for pandas explode. We need something similar for Spark. We have already implemented a version of this when the explode is operating on scalar lists (don't need actual list support to implement that), but quickly ran into memory issues with it. This is because when you explode a data frame on a single column all of the other columns end up being replicated.

>> a = pd.Series([[[1, 2], [3, 4]], [[5, 6], None]])
>> b = pd.Series([100, 200])
>> c = pd.DataFrame(data={'a': a, 'b' : b})
>> c.explode('a')
        a    b
0  [1, 2]  100
0  [3, 4]  100
1  [5, 6]  200
1    None  200

>> c.explode('a').explode('a')
      a    b
0     1  100
0     2  100
0     3  100
0     4  100
1     5  200
1     6  200
1  None  200

So the more columns you have and the deeper the arrays the more duplicate data that ends up being produced.

Because Spark is processing a stream of tables instead of a single table we implemented a "fix" for this by essentially keeping all of the columns cached in memory and outputting a new table for each entry in the scalar list. i.e.

for each item in scalar_explode_list:
    val new_column = scalarToColumn(item, cachedTable.numRows())
    yield new_column + cachedTable

Note that we also support a posexplode where we insert a new column that also includes the index of the array item.

Describe the solution you'd like

We would like to have a solution that lets us do an explode and a posexplode but limiting the output size.

I see a few options for this.

  1. We could splice the input table into smaller pieces and then run a regular explode on each of them in turn. The main thing we would need form this is some way to come up with the split positions. At a minimum we could just do this based off of a target number of output rows (get the sum of the length of all of the columns in the explode and then assuming they are all equal come up with the number of splits to do). But that would be vulnerable to data skew so a better algorithm possibly looking at data size instead of just row count would be ideal.

  2. We could do something similar to our current "fix" and get the length of the longest list entry in the column. Then go through one array index at a time, filter out all all entries that are not long enough, and then return the batch along with the item form the array at the given index.

I like one if we can do it efficiently because it is simple, and just one separate stand alone command. Also if we have skew in the length of the arrays, then 2 would need some special case processing to avoid having too much overhead when we hit the tail.

I am also open to other ideas if someone has a good one.

@revans2 revans2 added feature request New feature or request Needs Triage Need team to review and classify Spark Functionality that helps Spark RAPIDS labels Sep 3, 2020
@harrism harrism added the libcudf Affects libcudf (C++/CUDA) code. label Sep 9, 2020
@kkraus14 kkraus14 removed the Needs Triage Need team to review and classify label Sep 15, 2020
@kkraus14 kkraus14 added the Python Affects Python cuDF API. label Sep 28, 2020
@hyperbolic2346 hyperbolic2346 self-assigned this Jan 4, 2021
rapids-bot bot pushed a commit that referenced this issue Jan 25, 2021
This is an operation that expands lists into rows and duplicates the existing rows from other columns. Explanation can be found in the issue #6151 

partially fixes #6151 

Missing pos_explode support required to completely close out #6151

Authors:
  - Mike Wilson (@hyperbolic2346)

Approvers:
  - Robert (Bobby) Evans (@revans2)
  - Jake Hemstad (@jrhemstad)
  - Karthikeyan (@karthikeyann)
  - @nvdbaranec

URL: #7140
@harrism
Copy link
Member

harrism commented Jan 25, 2021

I think this was erroneously automatically closed by the merge of #7140. Reopening.

@harrism harrism reopened this Jan 25, 2021
@harrism harrism changed the title [FEA] memory effecient explode and pos_explode implementations [FEA] memory efficient explode and pos_explode implementations Jan 26, 2021
rapids-bot bot pushed a commit that referenced this issue Feb 26, 2021
This PR adds support for pos_explode in cuDF. It is very similar to explode, but includes a column with the index of the array that was exploded on each row.
```
a                b
[0, 1]          100
[2]             200
[3, 4, 5]       300
```

exploded on column a would result in
```
a         pos        b
0         0         100
1         1         100
2         0         200
3         0         300
4         1         300
5         2         300
```

partially fixes #6151

Authors:
  - Mike Wilson (@hyperbolic2346)

Approvers:
  - David (@davidwendt)
  - Jake Hemstad (@jrhemstad)

URL: #7376
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request libcudf Affects libcudf (C++/CUDA) code. Python Affects Python cuDF API. Spark Functionality that helps Spark RAPIDS
Projects
None yet
4 participants