Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++][Acero] Add the ability to merge already-sorted input nodes #38381

Closed
JerAguilon opened this issue Oct 20, 2023 · 3 comments · Fixed by #38380
Closed

[C++][Acero] Add the ability to merge already-sorted input nodes #38381

JerAguilon opened this issue Oct 20, 2023 · 3 comments · Fixed by #38380

Comments

@JerAguilon
Copy link
Contributor

JerAguilon commented Oct 20, 2023

Describe the enhancement requested

I have a use case wherein I want to asof join two datasets. However, each dataset on the left and right hand side is sharded across N files. Each file is individually sorted from top to bottom. Imagine a dataset composed of two files:

file A

ts,col
1,"foo"
3,"foo"
5,"foo"

file B

ts,col
2,"bar"
3,"bar"
6,"bar"

After merging

ts,col
1,"foo"
2,"bar"
3,"bar"
3,"foo"
5,"foo"
6,"bar"

Sorting using order_by isn't tenable for huge datasets. It'd be more efficient to stream each input table and emit batches in sorted order via a heap.

Merging N files this way is actually computationally similar to asof_join_node.cc in that you can efficiently do it by buffering data from all your input nodes and spawning a process thread that emits data in sorted fashion.

I propose refactoring some of the guts of asof_join_node.cc so that we can achieve the above computation. I think that this will unlock lots of potential that is hidden behind specialized databases like KDB.

I have a draft PR for the idea here: #38380 and have locally tested it for O(100GB) files.

Would be curious to get opinions from @icexelloss, @bkietz, and @westonpace on this approach!

Component(s)

C++

@icexelloss
Copy link
Contributor

I think this is a useful operation to have

@JerAguilon
Copy link
Contributor Author

PR is in a presentable state now--open to thoughts!

westonpace added a commit that referenced this issue Nov 6, 2023
### Rationale for this change

This is an implementation of a node that can merge N sorted inputs (only in ascending order for a first pass).

Where possible I have shared components with `asof_join_node.cc`.

Full description/use case is described in #38381

### What changes are included in this PR?

* Take out relevant guts of asofjoin to stream data top to bottom/consume in a non blocking manner
* Implement a sorted merger

### Are these changes tested?

Basic test added. Locally I have tested this on 100+ gigabytes of parquet, sharded across 50+ files. Happy to add a benchmark test on top of the basic test, but submitting now for code feedback.

### Are there any user-facing changes?

Yes, `sorted_merge` is now an exposed declaration

Lead-authored-by: Jeremy Aguilon <[email protected]>
Co-authored-by: jeremy <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
@westonpace westonpace added this to the 15.0.0 milestone Nov 6, 2023
@westonpace
Copy link
Member

Issue resolved by pull request 38380
#38380

loicalleyne pushed a commit to loicalleyne/arrow that referenced this issue Nov 13, 2023
### Rationale for this change

This is an implementation of a node that can merge N sorted inputs (only in ascending order for a first pass).

Where possible I have shared components with `asof_join_node.cc`.

Full description/use case is described in apache#38381

### What changes are included in this PR?

* Take out relevant guts of asofjoin to stream data top to bottom/consume in a non blocking manner
* Implement a sorted merger

### Are these changes tested?

Basic test added. Locally I have tested this on 100+ gigabytes of parquet, sharded across 50+ files. Happy to add a benchmark test on top of the basic test, but submitting now for code feedback.

### Are there any user-facing changes?

Yes, `sorted_merge` is now an exposed declaration

Lead-authored-by: Jeremy Aguilon <[email protected]>
Co-authored-by: jeremy <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
dgreiss pushed a commit to dgreiss/arrow that referenced this issue Feb 19, 2024
### Rationale for this change

This is an implementation of a node that can merge N sorted inputs (only in ascending order for a first pass).

Where possible I have shared components with `asof_join_node.cc`.

Full description/use case is described in apache#38381

### What changes are included in this PR?

* Take out relevant guts of asofjoin to stream data top to bottom/consume in a non blocking manner
* Implement a sorted merger

### Are these changes tested?

Basic test added. Locally I have tested this on 100+ gigabytes of parquet, sharded across 50+ files. Happy to add a benchmark test on top of the basic test, but submitting now for code feedback.

### Are there any user-facing changes?

Yes, `sorted_merge` is now an exposed declaration

Lead-authored-by: Jeremy Aguilon <[email protected]>
Co-authored-by: jeremy <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants