-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-44526: [C++][Acero] Fix crash when thread in asof_join is not running #44584
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -1731,6 +1731,33 @@ TEST(AsofJoinTest, RhsEmptinessRaceEmptyBy) { | |||||||
ExecBatchFromJSON({int64(), utf8()}, R"([[1, "Z"], [2, "Z"], [3, "B"]])"); | ||||||||
AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, result.batches); | ||||||||
} | ||||||||
// Reproduction of GH-44526: Provoke destruction of not started asofjoin node by providing | ||||||||
// a sink that fails on creation | ||||||||
TEST(AsofJoinTest, DetroyNotStartedAsofJoinNode) { | ||||||||
auto left_batch = ExecBatchFromJSON({int64()}, R"([[1], [2], [3]])"); | ||||||||
auto right_batch = | ||||||||
ExecBatchFromJSON({utf8(), int64()}, R"([["Z", 2], ["B", 3], ["A", 4]])"); | ||||||||
|
||||||||
Declaration left{"exec_batch_source", | ||||||||
ExecBatchSourceNodeOptions(schema({field("on", int64())}), | ||||||||
{std::move(left_batch)})}; | ||||||||
Declaration right{ | ||||||||
"exec_batch_source", | ||||||||
ExecBatchSourceNodeOptions(schema({field("colVals", utf8()), field("on", int64())}), | ||||||||
{std::move(right_batch)})}; | ||||||||
AsofJoinNodeOptions asof_join_opts({{{"on"}, {}}, {{"on"}, {}}}, 1); | ||||||||
Declaration asof_join{ | ||||||||
"asofjoin", {std::move(left), std::move(right)}, std::move(asof_join_opts)}; | ||||||||
|
||||||||
arrow::acero::SinkNodeOptions sink_node_options{nullptr, nullptr}; | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add: 1) parameter name (e.g. |
||||||||
arrow::acero::Declaration sink = | ||||||||
arrow::acero::Declaration::Sequence({asof_join, {"sink", sink_node_options}}); | ||||||||
|
||||||||
EXPECT_RAISES_WITH_MESSAGE_THAT( | ||||||||
Invalid, | ||||||||
::testing::HasSubstr( | ||||||||
"`generator` is a required SinkNode option and cannot be null"), | ||||||||
DeclarationToStatus(std::move(sink))); | ||||||||
} | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
} // namespace acero | ||||||||
} // namespace arrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.