Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support fine grained shuffle for window function #5048

Merged
merged 52 commits into from
Jul 11, 2022
Merged
Changes from 1 commit
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
21ea8e1
support fine grained shuffle
guo-shaoge May 24, 2022
1524bef
add unit test for StreamingDAGResponseWriter
guo-shaoge Jun 6, 2022
d860c45
add perf test for window function
guo-shaoge Jun 6, 2022
119b462
Merge branch 'master' of github.com:pingcap/tiflash into fine_grained…
guo-shaoge Jun 8, 2022
1e86122
fix conflict
guo-shaoge Jun 8, 2022
6dc4eab
fix contrib change
guo-shaoge Jun 8, 2022
2260514
fix some comment
guo-shaoge Jun 9, 2022
c1585b1
fix
guo-shaoge Jun 10, 2022
8211475
fix executeOrder()
guo-shaoge Jun 10, 2022
bcdf69f
Merge branch 'master' of github.com:pingcap/tiflash into fine_grained…
guo-shaoge Jun 15, 2022
162b1e3
add interpreter unittest
guo-shaoge Jun 15, 2022
df8aef7
Merge branch 'master' of github.com:pingcap/tiflash into fine_grained…
guo-shaoge Jun 16, 2022
d03f0eb
refine microbenchmark
guo-shaoge Jun 16, 2022
e5a01cf
rm exchange_perftest.cpp
guo-shaoge Jun 16, 2022
278240c
update kvproto dep
guo-shaoge Jun 20, 2022
0d4bf2e
fix fmt
guo-shaoge Jun 20, 2022
a4ee8a8
fix lint
guo-shaoge Jun 20, 2022
73e314d
Merge branch 'master' into fine_grained_shuffle
SeaRise Jun 21, 2022
eb605ce
Merge branch 'master' of github.com:pingcap/tiflash into fine_grained…
guo-shaoge Jun 23, 2022
e3d3ff3
fix conflict
guo-shaoge Jun 23, 2022
e8b9747
Merge branch 'fine_grained_shuffle' of github.com:guo-shaoge/tics int…
guo-shaoge Jun 23, 2022
db50925
enable fine_grained_shuffle in fragment level
guo-shaoge Jun 28, 2022
8624a85
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jun 29, 2022
c20c74c
stream_count from uint32_t to uint64_t
guo-shaoge Jun 30, 2022
b5df200
fix testcase
guo-shaoge Jul 1, 2022
4df7c9a
fix minor type
guo-shaoge Jul 1, 2022
d78a055
Merge branch 'fine_grained_shuffle' of github.com:guo-shaoge/tics int…
guo-shaoge Jul 1, 2022
9210111
fix comment
guo-shaoge Jul 1, 2022
e52fc32
add extra_info for stream
guo-shaoge Jul 2, 2022
6fbeb02
make enable_fine_grained_shuffle as template argument
guo-shaoge Jul 3, 2022
709ac80
change uint64_t to UInt64
guo-shaoge Jul 3, 2022
2c7ab24
fix some comment, add fullstack test
guo-shaoge Jul 4, 2022
505f49a
update tipb
guo-shaoge Jul 4, 2022
b1de8cd
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jul 4, 2022
383b89e
fix fmt
guo-shaoge Jul 4, 2022
db8497f
update window.test
guo-shaoge Jul 4, 2022
eda9596
Merge branch 'fine_grained_shuffle' of github.com:guo-shaoge/tics int…
guo-shaoge Jul 4, 2022
fb13e0e
fix some comments
guo-shaoge Jul 5, 2022
c23487a
update kvproto
guo-shaoge Jul 5, 2022
6e9f7b9
update kvproto
guo-shaoge Jul 5, 2022
030b874
fix bunch of comments
guo-shaoge Jul 7, 2022
59e3cb8
update
guo-shaoge Jul 7, 2022
c64d8ef
move send_exec_summary_at_last to top; Add RUNTIME_CHECK in DAGQueryB…
guo-shaoge Jul 7, 2022
c9ac908
using unique_ptr for msg_channels; use min(stream_count, max_stream)
guo-shaoge Jul 7, 2022
5893b37
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jul 7, 2022
0f035ec
fix
guo-shaoge Jul 7, 2022
031d1ed
Merge branch 'fine_grained_shuffle' of github.com:guo-shaoge/tics int…
guo-shaoge Jul 7, 2022
4f05ba7
Merge branch 'master' of github.com:pingcap/tiflash into fine_grained…
guo-shaoge Jul 8, 2022
8c34265
update tiflash-proxy
guo-shaoge Jul 8, 2022
ec3fe0d
fix conflict(mockExecutor.cpp)
guo-shaoge Jul 8, 2022
7b35d25
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jul 8, 2022
920673c
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jul 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Merge branch 'master' into fine_grained_shuffle
guo-shaoge authored Jul 10, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 920673cbee43730b08f2208962b8e6a6e1002558
245 changes: 245 additions & 0 deletions dbms/src/Flash/tests/gtest_interpreter.cpp
Original file line number Diff line number Diff line change
@@ -412,5 +412,250 @@ Union: <for test>
}
CATCH

TEST_F(InterpreterExecuteTest, Join)
try
{
// TODO: Find a way to write the request easier.
{
// Join Source.
DAGRequestBuilder table1 = context.scan("test_db", "r_table");
DAGRequestBuilder table2 = context.scan("test_db", "l_table");
DAGRequestBuilder table3 = context.scan("test_db", "r_table");
DAGRequestBuilder table4 = context.scan("test_db", "l_table");

auto request = table1.join(
table2.join(
table3.join(table4,
{col("join_c")},
ASTTableJoin::Kind::Left),
{col("join_c")},
ASTTableJoin::Kind::Left),
{col("join_c")},
ASTTableJoin::Kind::Left)
.build(context);

String expected = R"(
CreatingSets
Union: <for join>
HashJoinBuildBlockInputStream x 10: <join build, build_side_root_executor_id = table_scan_3>, join_kind = Left
Expression: <append join key and join filters for build side>
Expression: <final projection>
MockTableScan
Union x 2: <for join>
HashJoinBuildBlockInputStream x 10: <join build, build_side_root_executor_id = Join_4>, join_kind = Left
Expression: <append join key and join filters for build side>
Expression: <final projection>
Expression: <remove useless column after join>
HashJoinProbe: <join probe, join_executor_id = Join_4>
Expression: <final projection>
MockTableScan
Union: <for test>
Expression x 10: <final projection>
Expression: <remove useless column after join>
HashJoinProbe: <join probe, join_executor_id = Join_6>
Expression: <final projection>
MockTableScan)";
ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10);
}

{
// only join + ExchangeReceiver
DAGRequestBuilder receiver1 = context.receive("sender_l");
DAGRequestBuilder receiver2 = context.receive("sender_r");
DAGRequestBuilder receiver3 = context.receive("sender_l");
DAGRequestBuilder receiver4 = context.receive("sender_r");

auto request = receiver1.join(
receiver2.join(
receiver3.join(receiver4,
{col("join_c")},
ASTTableJoin::Kind::Left),
{col("join_c")},
ASTTableJoin::Kind::Left),
{col("join_c")},
ASTTableJoin::Kind::Left)
.build(context);

String expected = R"(
CreatingSets
Union: <for join>
HashJoinBuildBlockInputStream x 10: <join build, build_side_root_executor_id = exchange_receiver_3>, join_kind = Left
Expression: <append join key and join filters for build side>
Expression: <final projection>
MockExchangeReceiver
Union x 2: <for join>
HashJoinBuildBlockInputStream x 10: <join build, build_side_root_executor_id = Join_4>, join_kind = Left
Expression: <append join key and join filters for build side>
Expression: <final projection>
Expression: <remove useless column after join>
HashJoinProbe: <join probe, join_executor_id = Join_4>
Expression: <final projection>
MockExchangeReceiver
Union: <for test>
Expression x 10: <final projection>
Expression: <remove useless column after join>
HashJoinProbe: <join probe, join_executor_id = Join_6>
Expression: <final projection>
MockExchangeReceiver)";
ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10);
}

{
// join + receiver + sender
DAGRequestBuilder receiver1 = context.receive("sender_l");
DAGRequestBuilder receiver2 = context.receive("sender_r");
DAGRequestBuilder receiver3 = context.receive("sender_l");
DAGRequestBuilder receiver4 = context.receive("sender_r");

auto request = receiver1.join(
receiver2.join(
receiver3.join(receiver4,
{col("join_c")},
ASTTableJoin::Kind::Left),
{col("join_c")},
ASTTableJoin::Kind::Left),
{col("join_c")},
ASTTableJoin::Kind::Left)
.exchangeSender(tipb::PassThrough)
.build(context);

String expected = R"(
CreatingSets
Union: <for join>
HashJoinBuildBlockInputStream x 10: <join build, build_side_root_executor_id = exchange_receiver_3>, join_kind = Left
Expression: <append join key and join filters for build side>
Expression: <final projection>
MockExchangeReceiver
Union x 2: <for join>
HashJoinBuildBlockInputStream x 10: <join build, build_side_root_executor_id = Join_4>, join_kind = Left
Expression: <append join key and join filters for build side>
Expression: <final projection>
Expression: <remove useless column after join>
HashJoinProbe: <join probe, join_executor_id = Join_4>
Expression: <final projection>
MockExchangeReceiver
Union: <for test>
MockExchangeSender x 10
Expression: <final projection>
Expression: <remove useless column after join>
HashJoinProbe: <join probe, join_executor_id = Join_6>
Expression: <final projection>
MockExchangeReceiver)";
ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10);
}
}
CATCH

TEST_F(InterpreterExecuteTest, JoinThenAgg)
try
{
{
// Left Join.
DAGRequestBuilder table1 = context.scan("test_db", "r_table");
DAGRequestBuilder table2 = context.scan("test_db", "l_table");

auto request = table1.join(
table2,
{col("join_c")},
ASTTableJoin::Kind::Left)
.aggregation({Max(col("r_a"))}, {col("join_c")})
.build(context);
String expected = R"(
CreatingSets
Union: <for join>
HashJoinBuildBlockInputStream x 10: <join build, build_side_root_executor_id = table_scan_1>, join_kind = Left
Expression: <append join key and join filters for build side>
Expression: <final projection>
MockTableScan
Union: <for test>
Expression x 10: <final projection>
SharedQuery: <restore concurrency>
ParallelAggregating, max_threads: 10, final: true
Expression x 10: <before aggregation>
Expression: <remove useless column after join>
HashJoinProbe: <join probe, join_executor_id = Join_2>
Expression: <final projection>
MockTableScan)";
ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10);
}

{
// Right Join
DAGRequestBuilder table1 = context.scan("test_db", "r_table");
DAGRequestBuilder table2 = context.scan("test_db", "l_table");

auto request = table1.join(
table2,
{col("join_c")},
ASTTableJoin::Kind::Right)
.aggregation({Max(col("r_a"))}, {col("join_c")})
.build(context);
String expected = R"(
CreatingSets
Union: <for join>
HashJoinBuildBlockInputStream x 10: <join build, build_side_root_executor_id = table_scan_1>, join_kind = Right
Expression: <append join key and join filters for build side>
Expression: <final projection>
MockTableScan
Union: <for test>
Expression x 10: <final projection>
SharedQuery: <restore concurrency>
ParallelAggregating, max_threads: 10, final: true
Expression x 10: <before aggregation>
Expression: <remove useless column after join>
HashJoinProbe: <join probe, join_executor_id = Join_2>
Expression: <append join key and join filters for probe side>
Expression: <final projection>
MockTableScan
Expression x 10: <before aggregation>
Expression: <remove useless column after join>
NonJoined: <add stream with non_joined_data if full_or_right_join>)";
ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10);
}

{
// Right join + receiver + sender
DAGRequestBuilder receiver1 = context.receive("sender_l");
DAGRequestBuilder receiver2 = context.receive("sender_r");

auto request = receiver1.join(
receiver2,
{col("join_c")},
ASTTableJoin::Kind::Right)
.aggregation({Sum(col("r_a"))}, {col("join_c")})
.exchangeSender(tipb::PassThrough)
.limit(10)
.build(context);
String expected = R"(
CreatingSets
Union: <for join>
HashJoinBuildBlockInputStream x 20: <join build, build_side_root_executor_id = exchange_receiver_1>, join_kind = Right
Expression: <append join key and join filters for build side>
Expression: <final projection>
MockExchangeReceiver
Union: <for test>
MockExchangeSender x 20
SharedQuery: <restore concurrency>
Limit, limit = 10
Union: <for partial limit>
Limit x 20, limit = 10
Expression: <final projection>
Expression: <before order and select>
SharedQuery: <restore concurrency>
ParallelAggregating, max_threads: 20, final: true
Expression x 20: <before aggregation>
Expression: <remove useless column after join>
HashJoinProbe: <join probe, join_executor_id = Join_2>
Expression: <append join key and join filters for probe side>
Expression: <final projection>
MockExchangeReceiver
Expression x 20: <before aggregation>
Expression: <remove useless column after join>
NonJoined: <add stream with non_joined_data if full_or_right_join>)";
ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 20);
}
}
CATCH

} // namespace tests
} // namespace DB
You are viewing a condensed version of this merge commit. You can view the full changes here.