Skip to content

Commit

Permalink
Add unit test for more complicated filter
Browse files Browse the repository at this point in the history
  • Loading branch information
save-buffer committed Nov 4, 2021
1 parent 6f287cb commit 718f2a3
Showing 1 changed file with 72 additions and 0 deletions.
72 changes: 72 additions & 0 deletions cpp/src/arrow/compute/exec/hash_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1147,5 +1147,77 @@ TEST(HashJoin, Random) {
}
}

TEST(ExecPlanExecution, ResidualFilter) {
for (bool parallel : {false, true}) {
SCOPED_TRACE(parallel ? "parallel/merged" : "serial");

BatchesWithSchema input_left;
input_left.batches = {ExecBatchFromJSON({int32(), int32(), utf8()}, R"([
[1, 6, "alpha"],
[2, 5, "beta"],
[3, 4, "alpha"]
])")};
input_left.schema =
schema({field("l1", int32()), field("l2", int32()), field("l_str", utf8())});

BatchesWithSchema input_right;
input_right.batches = {ExecBatchFromJSON({int32(), int32(), utf8()}, R"([
[5, 11, "alpha"],
[2, 12, "beta"],
[4, 16, "alpha"]
])")};
input_right.schema =
schema({field("r1", int32()), field("r2", int32()), field("r_str", utf8())});

auto exec_ctx = arrow::internal::make_unique<ExecContext>(
default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr);

ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get()));
AsyncGenerator<util::optional<ExecBatch>> sink_gen;

ExecNode* left_source;
ExecNode* right_source;
ASSERT_OK_AND_ASSIGN(
left_source,
MakeExecNode("source", plan.get(), {},
SourceNodeOptions{input_left.schema,
input_left.gen(parallel, /*slow=*/false)}));

ASSERT_OK_AND_ASSIGN(
right_source,
MakeExecNode("source", plan.get(), {},
SourceNodeOptions{input_right.schema,
input_right.gen(parallel, /*slow=*/false)}))

Expression mul = call("multiply", {field_ref("l1"), field_ref("l2")});
Expression combination = call("add", {mul, field_ref("r1")});
Expression residual_filter = less_equal(combination, field_ref("r2"));

HashJoinNodeOptions join_opts{
JoinType::FULL_OUTER,
/*left_keys=*/{"l_str"},
/*right_keys=*/{"r_str"}, std::move(residual_filter), "l_", "r_"};

ASSERT_OK_AND_ASSIGN(
auto hashjoin,
MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts));

ASSERT_OK_AND_ASSIGN(std::ignore, MakeExecNode("sink", plan.get(), {hashjoin},
SinkNodeOptions{&sink_gen}));

ASSERT_FINISHES_OK_AND_ASSIGN(auto result, StartAndCollect(plan.get(), sink_gen));

std::vector<ExecBatch> expected = {
ExecBatchFromJSON({int32(), int32(), utf8(), int32(), int32(), utf8()}, R"([
[1, 6, "alpha", 4, 16, "alpha"],
[1, 6, "alpha", 5, 11, "alpha"],
[2, 5, "beta", 2, 12, "beta"],
[3, 4, "alpha", 4, 16, "alpha"]])")};
std::cout << result[0].ToString() << std::endl;

AssertExecBatchesEqual(hashjoin->output_schema(), result, expected);
}
}

} // namespace compute
} // namespace arrow

0 comments on commit 718f2a3

Please sign in to comment.