Skip to content

Commit

Permalink
apacheGH-38381: [C++][Acero] Create a sorted merge node (apache#38380)
Browse files Browse the repository at this point in the history
### Rationale for this change

This is an implementation of a node that can merge N sorted inputs (only in ascending order for a first pass).

Where possible I have shared components with `asof_join_node.cc`.

Full description/use case is described in apache#38381

### What changes are included in this PR?

* Take out relevant guts of asofjoin to stream data top to bottom/consume in a non blocking manner
* Implement a sorted merger

### Are these changes tested?

Basic test added. Locally I have tested this on 100+ gigabytes of parquet, sharded across 50+ files. Happy to add a benchmark test on top of the basic test, but submitting now for code feedback.

### Are there any user-facing changes?

Yes, `sorted_merge` is now an exposed declaration

Lead-authored-by: Jeremy Aguilon <[email protected]>
Co-authored-by: jeremy <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
  • Loading branch information
2 people authored and dgreiss committed Feb 17, 2024
1 parent 3e5b8b0 commit 7f1a22e
Show file tree
Hide file tree
Showing 10 changed files with 1,357 additions and 379 deletions.
6 changes: 5 additions & 1 deletion cpp/src/arrow/acero/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ set(ARROW_ACERO_SRCS
project_node.cc
query_context.cc
sink_node.cc
sorted_merge_node.cc
source_node.cc
swiss_join.cc
task_util.cc
time_series_util.cc
tpch_node.cc
union_node.cc
util.cc)
Expand Down Expand Up @@ -173,11 +175,13 @@ add_arrow_acero_test(hash_join_node_test SOURCES hash_join_node_test.cc
add_arrow_acero_test(pivot_longer_node_test SOURCES pivot_longer_node_test.cc
test_nodes.cc)

# asof_join_node uses std::thread internally
# asof_join_node and sorted_merge_node use std::thread internally
# and doesn't use ThreadPool so it will
# be broken if threading is turned off
if(ARROW_ENABLE_THREADING)
add_arrow_acero_test(asof_join_node_test SOURCES asof_join_node_test.cc test_nodes.cc)
add_arrow_acero_test(sorted_merge_node_test SOURCES sorted_merge_node_test.cc
test_nodes.cc)
endif()

add_arrow_acero_test(tpch_node_test SOURCES tpch_node_test.cc)
Expand Down
Loading

0 comments on commit 7f1a22e

Please sign in to comment.