diff --git a/cpp/src/arrow/compute/exec/hash_join_node_test.cc b/cpp/src/arrow/compute/exec/hash_join_node_test.cc index ac65025a42afc..3f9234f119142 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node_test.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node_test.cc @@ -1147,5 +1147,77 @@ TEST(HashJoin, Random) { } } +TEST(ExecPlanExecution, ResidualFilter) { + for (bool parallel : {false, true}) { + SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); + + BatchesWithSchema input_left; + input_left.batches = {ExecBatchFromJSON({int32(), int32(), utf8()}, R"([ + [1, 6, "alpha"], + [2, 5, "beta"], + [3, 4, "alpha"] + ])")}; + input_left.schema = + schema({field("l1", int32()), field("l2", int32()), field("l_str", utf8())}); + + BatchesWithSchema input_right; + input_right.batches = {ExecBatchFromJSON({int32(), int32(), utf8()}, R"([ + [5, 11, "alpha"], + [2, 12, "beta"], + [4, 16, "alpha"] + ])")}; + input_right.schema = + schema({field("r1", int32()), field("r2", int32()), field("r_str", utf8())}); + + auto exec_ctx = arrow::internal::make_unique( + default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr); + + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get())); + AsyncGenerator> sink_gen; + + ExecNode* left_source; + ExecNode* right_source; + ASSERT_OK_AND_ASSIGN( + left_source, + MakeExecNode("source", plan.get(), {}, + SourceNodeOptions{input_left.schema, + input_left.gen(parallel, /*slow=*/false)})); + + ASSERT_OK_AND_ASSIGN( + right_source, + MakeExecNode("source", plan.get(), {}, + SourceNodeOptions{input_right.schema, + input_right.gen(parallel, /*slow=*/false)})) + + Expression mul = call("multiply", {field_ref("l1"), field_ref("l2")}); + Expression combination = call("add", {mul, field_ref("r1")}); + Expression residual_filter = less_equal(combination, field_ref("r2")); + + HashJoinNodeOptions join_opts{ + JoinType::FULL_OUTER, + /*left_keys=*/{"l_str"}, + /*right_keys=*/{"r_str"}, std::move(residual_filter), "l_", "r_"}; + + ASSERT_OK_AND_ASSIGN( + auto hashjoin, + MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts)); + + ASSERT_OK_AND_ASSIGN(std::ignore, MakeExecNode("sink", plan.get(), {hashjoin}, + SinkNodeOptions{&sink_gen})); + + ASSERT_FINISHES_OK_AND_ASSIGN(auto result, StartAndCollect(plan.get(), sink_gen)); + + std::vector expected = { + ExecBatchFromJSON({int32(), int32(), utf8(), int32(), int32(), utf8()}, R"([ + [1, 6, "alpha", 4, 16, "alpha"], + [1, 6, "alpha", 5, 11, "alpha"], + [2, 5, "beta", 2, 12, "beta"], + [3, 4, "alpha", 4, 16, "alpha"]])")}; + std::cout << result[0].ToString() << std::endl; + + AssertExecBatchesEqual(hashjoin->output_schema(), result, expected); + } +} + } // namespace compute } // namespace arrow