Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support fine grained shuffle for window function #5048

Merged
merged 52 commits into from
Jul 11, 2022
Merged
Changes from 1 commit
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
21ea8e1
support fine grained shuffle
guo-shaoge May 24, 2022
1524bef
add unit test for StreamingDAGResponseWriter
guo-shaoge Jun 6, 2022
d860c45
add perf test for window function
guo-shaoge Jun 6, 2022
119b462
Merge branch 'master' of github.com:pingcap/tiflash into fine_grained…
guo-shaoge Jun 8, 2022
1e86122
fix conflict
guo-shaoge Jun 8, 2022
6dc4eab
fix contrib change
guo-shaoge Jun 8, 2022
2260514
fix some comment
guo-shaoge Jun 9, 2022
c1585b1
fix
guo-shaoge Jun 10, 2022
8211475
fix executeOrder()
guo-shaoge Jun 10, 2022
bcdf69f
Merge branch 'master' of github.com:pingcap/tiflash into fine_grained…
guo-shaoge Jun 15, 2022
162b1e3
add interpreter unittest
guo-shaoge Jun 15, 2022
df8aef7
Merge branch 'master' of github.com:pingcap/tiflash into fine_grained…
guo-shaoge Jun 16, 2022
d03f0eb
refine microbenchmark
guo-shaoge Jun 16, 2022
e5a01cf
rm exchange_perftest.cpp
guo-shaoge Jun 16, 2022
278240c
update kvproto dep
guo-shaoge Jun 20, 2022
0d4bf2e
fix fmt
guo-shaoge Jun 20, 2022
a4ee8a8
fix lint
guo-shaoge Jun 20, 2022
73e314d
Merge branch 'master' into fine_grained_shuffle
SeaRise Jun 21, 2022
eb605ce
Merge branch 'master' of github.com:pingcap/tiflash into fine_grained…
guo-shaoge Jun 23, 2022
e3d3ff3
fix conflict
guo-shaoge Jun 23, 2022
e8b9747
Merge branch 'fine_grained_shuffle' of github.com:guo-shaoge/tics int…
guo-shaoge Jun 23, 2022
db50925
enable fine_grained_shuffle in fragment level
guo-shaoge Jun 28, 2022
8624a85
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jun 29, 2022
c20c74c
stream_count from uint32_t to uint64_t
guo-shaoge Jun 30, 2022
b5df200
fix testcase
guo-shaoge Jul 1, 2022
4df7c9a
fix minor type
guo-shaoge Jul 1, 2022
d78a055
Merge branch 'fine_grained_shuffle' of github.com:guo-shaoge/tics int…
guo-shaoge Jul 1, 2022
9210111
fix comment
guo-shaoge Jul 1, 2022
e52fc32
add extra_info for stream
guo-shaoge Jul 2, 2022
6fbeb02
make enable_fine_grained_shuffle as template argument
guo-shaoge Jul 3, 2022
709ac80
change uint64_t to UInt64
guo-shaoge Jul 3, 2022
2c7ab24
fix some comment, add fullstack test
guo-shaoge Jul 4, 2022
505f49a
update tipb
guo-shaoge Jul 4, 2022
b1de8cd
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jul 4, 2022
383b89e
fix fmt
guo-shaoge Jul 4, 2022
db8497f
update window.test
guo-shaoge Jul 4, 2022
eda9596
Merge branch 'fine_grained_shuffle' of github.com:guo-shaoge/tics int…
guo-shaoge Jul 4, 2022
fb13e0e
fix some comments
guo-shaoge Jul 5, 2022
c23487a
update kvproto
guo-shaoge Jul 5, 2022
6e9f7b9
update kvproto
guo-shaoge Jul 5, 2022
030b874
fix bunch of comments
guo-shaoge Jul 7, 2022
59e3cb8
update
guo-shaoge Jul 7, 2022
c64d8ef
move send_exec_summary_at_last to top; Add RUNTIME_CHECK in DAGQueryB…
guo-shaoge Jul 7, 2022
c9ac908
using unique_ptr for msg_channels; use min(stream_count, max_stream)
guo-shaoge Jul 7, 2022
5893b37
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jul 7, 2022
0f035ec
fix
guo-shaoge Jul 7, 2022
031d1ed
Merge branch 'fine_grained_shuffle' of github.com:guo-shaoge/tics int…
guo-shaoge Jul 7, 2022
4f05ba7
Merge branch 'master' of github.com:pingcap/tiflash into fine_grained…
guo-shaoge Jul 8, 2022
8c34265
update tiflash-proxy
guo-shaoge Jul 8, 2022
ec3fe0d
fix conflict(mockExecutor.cpp)
guo-shaoge Jul 8, 2022
7b35d25
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jul 8, 2022
920673c
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jul 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix some comment, add fullstack test
Signed-off-by: guo-shaoge <shaoge1994@163.com>
guo-shaoge committed Jul 4, 2022

Verified

This commit was signed with the committer’s verified signature.
nbbeeken Neal Beeken
commit 2c7ab24867b265a970fd7abb3df6bb276ad4ff2e
7 changes: 6 additions & 1 deletion dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp
Original file line number Diff line number Diff line change
@@ -368,13 +368,14 @@ void computeHash(const Block & input_block,
}
}

/// hash exchanging data among only TiFlash nodes.
/// Hash exchanging data among only TiFlash nodes. Only be called when enable_fine_grained_shuffle is false.
template <class StreamWriterPtr, bool enable_fine_grained_shuffle>
template <bool send_exec_summary_at_last>
void StreamingDAGResponseWriter<StreamWriterPtr, enable_fine_grained_shuffle>::partitionAndEncodeThenWriteBlocks(
std::vector<Block> & input_blocks,
tipb::SelectResponse & response) const
{
static_assert(!enable_fine_grained_shuffle);
std::vector<mpp::MPPDataPacket> packet(partition_num);
std::vector<size_t> responses_row_count(partition_num);
handleExecSummary<send_exec_summary_at_last>(input_blocks, packet, response);
@@ -406,10 +407,12 @@ void StreamingDAGResponseWriter<StreamWriterPtr, enable_fine_grained_shuffle>::p
writePackets<send_exec_summary_at_last>(responses_row_count, packet);
}

/// Hash exchanging data among only TiFlash nodes. Only be called when enable_fine_grained_shuffle is true.
template <class StreamWriterPtr, bool enable_fine_grained_shuffle>
template <bool send_exec_summary_at_last>
void StreamingDAGResponseWriter<StreamWriterPtr, enable_fine_grained_shuffle>::batchWriteFineGrainedShuffle()
{
static_assert(enable_fine_grained_shuffle);
assert(exchange_type == tipb::ExchangeType::Hash);
assert(fine_grained_shuffle_stream_count > 0);
assert(fine_grained_shuffle_batch_size > 0);
@@ -430,6 +433,7 @@ void StreamingDAGResponseWriter<StreamWriterPtr, enable_fine_grained_shuffle>::b
initInputBlocks(blocks);
initDestColumns(blocks[0], final_dest_tbl_columns);

// Hash partition input_blocks into bucket_num.
for (const auto & block : blocks)
{
std::vector<String> partition_key_containers(collators.size());
@@ -446,6 +450,7 @@ void StreamingDAGResponseWriter<StreamWriterPtr, enable_fine_grained_shuffle>::b
}
}

// For i-th stream_count buckets, send to i-th tiflash node.
for (size_t bucket_idx = 0; bucket_idx < bucket_num; bucket_idx += fine_grained_shuffle_stream_count)
{
size_t part_id = bucket_idx / fine_grained_shuffle_stream_count; // NOLINT(clang-analyzer-core.DivideZero)
1 change: 0 additions & 1 deletion dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h
Original file line number Diff line number Diff line change
@@ -56,7 +56,6 @@ class StreamingDAGResponseWriter : public DAGResponseWriter
void batchWrite();
template <bool send_exec_summary_at_last>
void batchWriteFineGrainedShuffle();
bool canUseFineGrainedShuffle() const;

template <bool send_exec_summary_at_last>
void encodeThenWriteBlocks(const std::vector<Block> & input_blocks, tipb::SelectResponse & response) const;
1 change: 0 additions & 1 deletion dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
@@ -409,7 +409,6 @@ ExchangeReceiverBase<RPCContext>::ExchangeReceiverBase(
{
try
{
// TODO: just bool is enough
if (enable_fine_grained_shuffle_)
{
for (size_t i = 0; i < max_streams_; ++i)
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/ExchangeReceiver.h
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ struct ReceivedMessage
{
size_t source_index;
String req_info;
// shared_ptr<const MPPDataPacket> is copied, so error_ptr, resp_ptr and chunks are valid.
// shared_ptr<const MPPDataPacket> is copied to make sure error_ptr, resp_ptr and chunks are valid.
const std::shared_ptr<const MPPDataPacket> packet;
const mpp::Error * error_ptr;
const String * resp_ptr;
32 changes: 32 additions & 0 deletions tests/fullstack-test/mpp/window.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright 2022 PingCAP, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

mysql> drop table if exists test.t1;
mysql> create table test.t1(c1 int, c2 int);
mysql> insert into test.t1 values(1, 1),(2, 2),(3, 3),(1, 1),(2, 2),(3, 3),(4, 4);
mysql> alter table test.t1 set tiflash replica 1;
func> wait_table test test.t1
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; select c1, c2, row_number() over w2, row_number() over w1 from test.t1 window w1 as(partition by c1), w2 as (partition by c1, c2) order by 1, 2, 3, 4;
+------+------+----------------------+----------------------+
| c1 | c2 | row_number() over w2 | row_number() over w1 |
+------+------+----------------------+----------------------+
| 1 | 1 | 1 | 1 |
| 1 | 1 | 2 | 2 |
| 2 | 2 | 1 | 1 |
| 2 | 2 | 2 | 2 |
| 3 | 3 | 1 | 1 |
| 3 | 3 | 2 | 2 |
| 4 | 4 | 1 | 1 |
+------+------+----------------------+----------------------+
mysql> drop table if exists test.t1;