From ef34f3b92d0c8b897b3c27815a1da4c913b1621b Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 27 Sep 2022 18:43:44 +0800 Subject: [PATCH] .*: Separate `FineGrainedShuffleWriter`, `HashParitionWriter`, `BroadcastOrPassThroughWriter` from `StreamingDAGResponseWriter` (#5910) ref pingcap/tiflash#5900 --- dbms/src/Flash/Coprocessor/DAGDriver.cpp | 9 +- .../Coprocessor/DAGQueryBlockInterpreter.cpp | 57 +-- .../Flash/Coprocessor/DAGResponseWriter.cpp | 21 +- .../src/Flash/Coprocessor/DAGResponseWriter.h | 28 +- .../StreamingDAGResponseWriter.cpp | 394 ++------------- .../Coprocessor/StreamingDAGResponseWriter.h | 45 +- .../tests/gtest_streaming_dag_writer.cpp | 184 ------- .../tests/gtest_streaming_writer.cpp | 222 +++++++++ .../Mpp/BroadcastOrPassThroughWriter.cpp | 102 ++++ .../Flash/Mpp/BroadcastOrPassThroughWriter.h | 49 ++ .../Flash/Mpp/FineGrainedShuffleWriter.cpp | 167 +++++++ dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h | 59 +++ dbms/src/Flash/Mpp/HashBaseWriterHelper.cpp | 80 +++ dbms/src/Flash/Mpp/HashBaseWriterHelper.h | 32 ++ dbms/src/Flash/Mpp/HashParitionWriter.h | 57 +++ dbms/src/Flash/Mpp/HashPartitionWriter.cpp | 142 ++++++ dbms/src/Flash/Mpp/newMPPExchangeWriter.h | 87 ++++ .../Mpp/tests/gtest_mpp_exchange_writer.cpp | 455 ++++++++++++++++++ .../Planner/plans/PhysicalExchangeSender.cpp | 57 +-- dbms/src/Flash/tests/bench_exchange.cpp | 29 +- 20 files changed, 1582 insertions(+), 694 deletions(-) delete mode 100644 dbms/src/Flash/Coprocessor/tests/gtest_streaming_dag_writer.cpp create mode 100644 dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp create mode 100644 dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp create mode 100644 dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h create mode 100644 dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp create mode 100644 dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h create mode 100644 dbms/src/Flash/Mpp/HashBaseWriterHelper.cpp create mode 100644 dbms/src/Flash/Mpp/HashBaseWriterHelper.h create mode 100644 dbms/src/Flash/Mpp/HashParitionWriter.h create mode 100644 dbms/src/Flash/Mpp/HashPartitionWriter.cpp create mode 100644 dbms/src/Flash/Mpp/newMPPExchangeWriter.h create mode 100644 dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp diff --git a/dbms/src/Flash/Coprocessor/DAGDriver.cpp b/dbms/src/Flash/Coprocessor/DAGDriver.cpp index 5df5c95c6e0..5b355295401 100644 --- a/dbms/src/Flash/Coprocessor/DAGDriver.cpp +++ b/dbms/src/Flash/Coprocessor/DAGDriver.cpp @@ -128,17 +128,12 @@ try auto streaming_writer = std::make_shared(writer); TiDB::TiDBCollators collators; - std::unique_ptr response_writer = std::make_unique>( + std::unique_ptr response_writer = std::make_unique>( streaming_writer, - std::vector(), - collators, - tipb::ExchangeType::PassThrough, context.getSettingsRef().dag_records_per_chunk, context.getSettingsRef().batch_send_min_limit, true, - dag_context, - /*fine_grained_shuffle_stream_count=*/0, - /*fine_grained_shuffle_batch_size=*/0); + dag_context); dag_output_stream = std::make_shared(streams.in->getHeader(), std::move(response_writer)); copyData(*streams.in, *dag_output_stream); } diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 72cb7bbefbe..a764fdb791d 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -44,8 +44,8 @@ #include #include #include -#include #include +#include #include #include #include @@ -738,44 +738,31 @@ void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline) const uint64_t stream_count = query_block.exchange_sender->fine_grained_shuffle_stream_count(); const uint64_t batch_size = query_block.exchange_sender->fine_grained_shuffle_batch_size(); - if (enableFineGrainedShuffle(stream_count)) + auto enable_fine_grained_shuffle = enableFineGrainedShuffle(stream_count); + String extra_info; + if (enable_fine_grained_shuffle) { - pipeline.transform([&](auto & stream) { - // construct writer - std::unique_ptr response_writer = std::make_unique>( - context.getDAGContext()->tunnel_set, - partition_col_ids, - partition_col_collators, - exchange_sender.tp(), - context.getSettingsRef().dag_records_per_chunk, - context.getSettingsRef().batch_send_min_limit, - stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response - dagContext(), - stream_count, - batch_size); - stream = std::make_shared(stream, std::move(response_writer), log->identifier()); - stream->setExtraInfo(String(enableFineGrainedShuffleExtraInfo)); - }); + extra_info = String(enableFineGrainedShuffleExtraInfo); RUNTIME_CHECK(exchange_sender.tp() == tipb::ExchangeType::Hash, ExchangeType_Name(exchange_sender.tp())); RUNTIME_CHECK(stream_count <= 1024, stream_count); } - else - { - pipeline.transform([&](auto & stream) { - std::unique_ptr response_writer = std::make_unique>( - context.getDAGContext()->tunnel_set, - partition_col_ids, - partition_col_collators, - exchange_sender.tp(), - context.getSettingsRef().dag_records_per_chunk, - context.getSettingsRef().batch_send_min_limit, - stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response - dagContext(), - stream_count, - batch_size); - stream = std::make_shared(stream, std::move(response_writer), log->identifier()); - }); - } + pipeline.transform([&](auto & stream) { + // construct writer + std::unique_ptr response_writer = newMPPExchangeWriter( + context.getDAGContext()->tunnel_set, + partition_col_ids, + partition_col_collators, + exchange_sender.tp(), + context.getSettingsRef().dag_records_per_chunk, + context.getSettingsRef().batch_send_min_limit, + stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response + dagContext(), + enable_fine_grained_shuffle, + stream_count, + batch_size); + stream = std::make_shared(stream, std::move(response_writer), log->identifier()); + stream->setExtraInfo(extra_info); + }); } void DAGQueryBlockInterpreter::handleMockExchangeSender(DAGPipeline & pipeline) diff --git a/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp b/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp index fe4e9c23bca..e3cafd268f0 100644 --- a/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp @@ -18,6 +18,24 @@ namespace DB { +void ExecutionSummary::merge(const ExecutionSummary & other, bool streaming_call) +{ + if (streaming_call) + { + time_processed_ns = std::max(time_processed_ns, other.time_processed_ns); + num_produced_rows = std::max(num_produced_rows, other.num_produced_rows); + num_iterations = std::max(num_iterations, other.num_iterations); + concurrency = std::max(concurrency, other.concurrency); + } + else + { + time_processed_ns = std::max(time_processed_ns, other.time_processed_ns); + num_produced_rows += other.num_produced_rows; + num_iterations += other.num_iterations; + concurrency += other.concurrency; + } +} + /// delta_mode means when for a streaming call, return the delta execution summary /// because TiDB is not aware of the streaming call when it handle the execution summaries /// so we need to "pretend to be a unary call", can be removed if TiDB support streaming @@ -179,7 +197,8 @@ DAGResponseWriter::DAGResponseWriter( { records_per_chunk = -1; } - if (dag_context.encode_type != tipb::EncodeType::TypeCHBlock && dag_context.encode_type != tipb::EncodeType::TypeChunk + if (dag_context.encode_type != tipb::EncodeType::TypeCHBlock + && dag_context.encode_type != tipb::EncodeType::TypeChunk && dag_context.encode_type != tipb::EncodeType::TypeDefault) { throw TiFlashException( diff --git a/dbms/src/Flash/Coprocessor/DAGResponseWriter.h b/dbms/src/Flash/Coprocessor/DAGResponseWriter.h index 2267d143bf6..fba7b011d85 100644 --- a/dbms/src/Flash/Coprocessor/DAGResponseWriter.h +++ b/dbms/src/Flash/Coprocessor/DAGResponseWriter.h @@ -14,15 +14,9 @@ #pragma once -#include -#include -#include -#include -#include -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wunused-parameter" +#include +#include #include -#pragma GCC diagnostic pop namespace DB { @@ -40,23 +34,7 @@ struct ExecutionSummary , concurrency(0) {} - void merge(const ExecutionSummary & other, bool streaming_call) - { - if (streaming_call) - { - time_processed_ns = std::max(time_processed_ns, other.time_processed_ns); - num_produced_rows = std::max(num_produced_rows, other.num_produced_rows); - num_iterations = std::max(num_iterations, other.num_iterations); - concurrency = std::max(concurrency, other.concurrency); - } - else - { - time_processed_ns = std::max(time_processed_ns, other.time_processed_ns); - num_produced_rows += other.num_produced_rows; - num_iterations += other.num_iterations; - concurrency += other.concurrency; - } - } + void merge(const ExecutionSummary & other, bool streaming_call); }; class DAGResponseWriter diff --git a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp index 5fa7eb245ab..ab0852a1a59 100644 --- a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp +++ b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp @@ -14,15 +14,12 @@ #include #include -#include #include #include #include #include #include #include -#include -#include #include @@ -34,36 +31,19 @@ extern const int UNSUPPORTED_PARAMETER; extern const int LOGICAL_ERROR; } // namespace ErrorCodes -inline void serializeToPacket(mpp::MPPDataPacket & packet, const tipb::SelectResponse & response) -{ - if (!response.SerializeToString(packet.mutable_data())) - throw Exception(fmt::format("Fail to serialize response, response size: {}", response.ByteSizeLong())); -} - -template -StreamingDAGResponseWriter::StreamingDAGResponseWriter( +template +StreamingDAGResponseWriter::StreamingDAGResponseWriter( StreamWriterPtr writer_, - std::vector partition_col_ids_, - TiDB::TiDBCollators collators_, - tipb::ExchangeType exchange_type_, Int64 records_per_chunk_, Int64 batch_send_min_limit_, bool should_send_exec_summary_at_last_, - DAGContext & dag_context_, - uint64_t fine_grained_shuffle_stream_count_, - UInt64 fine_grained_shuffle_batch_size_) + DAGContext & dag_context_) : DAGResponseWriter(records_per_chunk_, dag_context_) , batch_send_min_limit(batch_send_min_limit_) , should_send_exec_summary_at_last(should_send_exec_summary_at_last_) - , exchange_type(exchange_type_) , writer(writer_) - , partition_col_ids(std::move(partition_col_ids_)) - , collators(std::move(collators_)) - , fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_) - , fine_grained_shuffle_batch_size(fine_grained_shuffle_batch_size_) { rows_in_blocks = 0; - partition_num = writer_->getPartitionNum(); switch (dag_context.encode_type) { case tipb::EncodeType::TypeDefault: @@ -75,43 +55,34 @@ StreamingDAGResponseWriter::Stream case tipb::EncodeType::TypeCHBlock: chunk_codec_stream = std::make_unique()->newCodecStream(dag_context.result_field_types); break; + default: + throw TiFlashException("Unsupported EncodeType", Errors::Coprocessor::Internal); } + /// For other encode types, we will use records_per_chunk to control the batch size sent. + batch_send_min_limit = dag_context.encode_type == tipb::EncodeType::TypeCHBlock + ? batch_send_min_limit + : (records_per_chunk - 1); } -template -void StreamingDAGResponseWriter::finishWrite() +template +void StreamingDAGResponseWriter::finishWrite() { if (should_send_exec_summary_at_last) { - if constexpr (enable_fine_grained_shuffle) - { - assert(exchange_type == tipb::ExchangeType::Hash); - batchWriteFineGrainedShuffle(); - } - else - { - batchWrite(); - } + encodeThenWriteBlocks(); } else { - if constexpr (enable_fine_grained_shuffle) - { - assert(exchange_type == tipb::ExchangeType::Hash); - batchWriteFineGrainedShuffle(); - } - else - { - batchWrite(); - } + encodeThenWriteBlocks(); } } -template -void StreamingDAGResponseWriter::write(const Block & block) +template +void StreamingDAGResponseWriter::write(const Block & block) { - if (block.columns() != dag_context.result_field_types.size()) - throw TiFlashException("Output column size mismatch with field type size", Errors::Coprocessor::Internal); + RUNTIME_CHECK_MSG( + block.columns() == dag_context.result_field_types.size(), + "Output column size mismatch with field type size"); size_t rows = block.rows(); rows_in_blocks += rows; if (rows > 0) @@ -119,85 +90,45 @@ void StreamingDAGResponseWriter::w blocks.push_back(block); } - if constexpr (enable_fine_grained_shuffle) - { - assert(exchange_type == tipb::ExchangeType::Hash); - if (static_cast(rows_in_blocks) >= fine_grained_shuffle_batch_size) - batchWriteFineGrainedShuffle(); - } - else - { - if (static_cast(rows_in_blocks) > (dag_context.encode_type == tipb::EncodeType::TypeCHBlock ? batch_send_min_limit : records_per_chunk - 1)) - batchWrite(); - } + if (static_cast(rows_in_blocks) > batch_send_min_limit) + encodeThenWriteBlocks(); } -template +template template -void StreamingDAGResponseWriter::encodeThenWriteBlocks( - const std::vector & input_blocks, - TrackedSelectResp & response) const +void StreamingDAGResponseWriter::encodeThenWriteBlocks() { - if (dag_context.encode_type == tipb::EncodeType::TypeCHBlock) + TrackedSelectResp response; + if constexpr (send_exec_summary_at_last) + addExecuteSummaries(response.getResponse(), /*delta_mode=*/true); + response.setEncodeType(dag_context.encode_type); + if (blocks.empty()) { - if (dag_context.isMPPTask()) /// broadcast data among TiFlash nodes in MPP - { - TrackedMppDataPacket tracked_packet(current_memory_tracker); - if constexpr (send_exec_summary_at_last) - { - tracked_packet.serializeByResponse(response.getResponse()); - } - if (input_blocks.empty()) - { - if constexpr (send_exec_summary_at_last) - { - writer->write(tracked_packet.getPacket()); - } - return; - } - for (const auto & block : input_blocks) - { - chunk_codec_stream->encode(block, 0, block.rows()); - tracked_packet.addChunk(chunk_codec_stream->getString()); - chunk_codec_stream->clear(); - } - writer->write(tracked_packet.getPacket()); - } - else /// passthrough data to a non-TiFlash node, like sending data to TiSpark + if constexpr (send_exec_summary_at_last) { - response.setEncodeType(dag_context.encode_type); - if (input_blocks.empty()) - { - if constexpr (send_exec_summary_at_last) - { - writer->write(response.getResponse()); - } - return; - } - for (const auto & block : input_blocks) - { - chunk_codec_stream->encode(block, 0, block.rows()); - response.addChunk(chunk_codec_stream->getString()); - chunk_codec_stream->clear(); - } writer->write(response.getResponse()); } + return; } - else /// passthrough data to a TiDB node + + if (dag_context.encode_type == tipb::EncodeType::TypeCHBlock) { - response.setEncodeType(dag_context.encode_type); - if (input_blocks.empty()) + /// passthrough data to a non-TiFlash node, like sending data to TiSpark + while (!blocks.empty()) { - if constexpr (send_exec_summary_at_last) - { - writer->write(response.getResponse()); - } - return; + const auto & block = blocks.back(); + chunk_codec_stream->encode(block, 0, block.rows()); + blocks.pop_back(); + response.addChunk(chunk_codec_stream->getString()); + chunk_codec_stream->clear(); } - + } + else /// passthrough data to a TiDB node + { Int64 current_records_num = 0; - for (const auto & block : input_blocks) + while (!blocks.empty()) { + const auto & block = blocks.back(); size_t rows = block.rows(); for (size_t row_index = 0; row_index < rows;) { @@ -212,6 +143,7 @@ void StreamingDAGResponseWriter::e current_records_num += (upper - row_index); row_index = upper; } + blocks.pop_back(); } if (current_records_num > 0) @@ -219,243 +151,13 @@ void StreamingDAGResponseWriter::e response.addChunk(chunk_codec_stream->getString()); chunk_codec_stream->clear(); } - writer->write(response.getResponse()); - } -} - - -template -template -void StreamingDAGResponseWriter::batchWrite() -{ - TrackedSelectResp response; - if constexpr (send_exec_summary_at_last) - addExecuteSummaries(response.getResponse(), !dag_context.isMPPTask() || dag_context.isRootMPPTask()); - if (exchange_type == tipb::ExchangeType::Hash) - { - partitionAndEncodeThenWriteBlocks(blocks, response); - } - else - { - encodeThenWriteBlocks(blocks, response); - } - blocks.clear(); - rows_in_blocks = 0; -} - -template -template -void StreamingDAGResponseWriter::handleExecSummary( - const std::vector & input_blocks, - std::vector & packets, - tipb::SelectResponse & response) const -{ - if constexpr (send_exec_summary_at_last) - { - /// Sending the response to only one node, default the first one. - packets[0].serializeByResponse(response); - - // No need to send data when blocks are not empty, - // because exec_summary will be sent together with blocks. - if (input_blocks.empty()) - { - for (auto part_id = 0; part_id < partition_num; ++part_id) - { - writer->write(packets[part_id].getPacket(), part_id); - } - } - } -} - -template -template -void StreamingDAGResponseWriter::writePackets( - const std::vector & responses_row_count, - std::vector & packets) const -{ - for (size_t part_id = 0; part_id < packets.size(); ++part_id) - { - if constexpr (send_exec_summary_at_last) - { - writer->write(packets[part_id].getPacket(), part_id); - } - else - { - if (responses_row_count[part_id] > 0) - writer->write(packets[part_id].getPacket(), part_id); - } - } -} - -inline void initInputBlocks(std::vector & input_blocks) -{ - for (auto & input_block : input_blocks) - { - for (size_t i = 0; i < input_block.columns(); ++i) - { - if (ColumnPtr converted = input_block.getByPosition(i).column->convertToFullColumnIfConst()) - input_block.getByPosition(i).column = converted; - } - } -} - -inline void initDestColumns(const Block & input_block, std::vector & dest_tbl_cols) -{ - for (auto & cols : dest_tbl_cols) - { - cols = input_block.cloneEmptyColumns(); - } -} - -void computeHash(const Block & input_block, - uint32_t bucket_num, - const TiDB::TiDBCollators & collators, - std::vector & partition_key_containers, - const std::vector & partition_col_ids, - std::vector> & result_columns) -{ - size_t rows = input_block.rows(); - WeakHash32 hash(rows); - - // get hash values by all partition key columns - for (size_t i = 0; i < partition_col_ids.size(); ++i) - { - input_block.getByPosition(partition_col_ids[i]).column->updateWeakHash32(hash, collators[i], partition_key_containers[i]); - } - - const auto & hash_data = hash.getData(); - - // partition each row - IColumn::Selector selector(rows); - for (size_t row = 0; row < rows; ++row) - { - /// Row from interval [(2^32 / bucket_num) * i, (2^32 / bucket_num) * (i + 1)) goes to bucket with number i. - selector[row] = hash_data[row]; /// [0, 2^32) - selector[row] *= bucket_num; /// [0, bucket_num * 2^32), selector stores 64 bit values. - selector[row] >>= 32u; /// [0, bucket_num) - } - - for (size_t col_id = 0; col_id < input_block.columns(); ++col_id) - { - // Scatter columns to different partitions - std::vector part_columns = input_block.getByPosition(col_id).column->scatter(bucket_num, selector); - assert(part_columns.size() == bucket_num); - for (size_t bucket_idx = 0; bucket_idx < bucket_num; ++bucket_idx) - { - result_columns[bucket_idx][col_id] = std::move(part_columns[bucket_idx]); - } - } -} - -/// Hash exchanging data among only TiFlash nodes. Only be called when enable_fine_grained_shuffle is false. -template -template -void StreamingDAGResponseWriter::partitionAndEncodeThenWriteBlocks( - std::vector & input_blocks, - TrackedSelectResp & response) const -{ - static_assert(!enable_fine_grained_shuffle); - std::vector tracked_packets(partition_num); - std::vector responses_row_count(partition_num); - handleExecSummary(input_blocks, tracked_packets, response.getResponse()); - if (input_blocks.empty()) - return; - - initInputBlocks(input_blocks); - Block dest_block = input_blocks[0].cloneEmpty(); - std::vector partition_key_containers(collators.size()); - for (const auto & block : input_blocks) - { - std::vector dest_tbl_cols(partition_num); - initDestColumns(block, dest_tbl_cols); - - computeHash(block, partition_num, collators, partition_key_containers, partition_col_ids, dest_tbl_cols); - - for (size_t part_id = 0; part_id < partition_num; ++part_id) - { - dest_block.setColumns(std::move(dest_tbl_cols[part_id])); - responses_row_count[part_id] += dest_block.rows(); - chunk_codec_stream->encode(dest_block, 0, dest_block.rows()); - tracked_packets[part_id].addChunk(chunk_codec_stream->getString()); - chunk_codec_stream->clear(); - } } - writePackets(responses_row_count, tracked_packets); -} - -/// Hash exchanging data among only TiFlash nodes. Only be called when enable_fine_grained_shuffle is true. -template -template -void StreamingDAGResponseWriter::batchWriteFineGrainedShuffle() -{ - static_assert(enable_fine_grained_shuffle); - assert(exchange_type == tipb::ExchangeType::Hash); - assert(fine_grained_shuffle_stream_count <= 1024); - - tipb::SelectResponse response; - if constexpr (send_exec_summary_at_last) - addExecuteSummaries(response, !dag_context.isMPPTask() || dag_context.isRootMPPTask()); - - std::vector tracked_packets(partition_num); - std::vector responses_row_count(partition_num, 0); - - // fine_grained_shuffle_stream_count is in [0, 1024], and partition_num is uint16_t, so will not overflow. - uint32_t bucket_num = partition_num * fine_grained_shuffle_stream_count; - handleExecSummary(blocks, tracked_packets, response); - if (!blocks.empty()) - { - std::vector final_dest_tbl_columns(bucket_num); - initInputBlocks(blocks); - initDestColumns(blocks[0], final_dest_tbl_columns); - - // Hash partition input_blocks into bucket_num. - for (const auto & block : blocks) - { - std::vector partition_key_containers(collators.size()); - std::vector dest_tbl_columns(bucket_num); - initDestColumns(block, dest_tbl_columns); - computeHash(block, bucket_num, collators, partition_key_containers, partition_col_ids, dest_tbl_columns); - for (size_t bucket_idx = 0; bucket_idx < bucket_num; ++bucket_idx) - { - for (size_t col_id = 0; col_id < block.columns(); ++col_id) - { - const MutableColumnPtr & src_col = dest_tbl_columns[bucket_idx][col_id]; - final_dest_tbl_columns[bucket_idx][col_id]->insertRangeFrom(*src_col, 0, src_col->size()); - } - } - } - - // For i-th stream_count buckets, send to i-th tiflash node. - for (size_t bucket_idx = 0; bucket_idx < bucket_num; bucket_idx += fine_grained_shuffle_stream_count) - { - size_t part_id = bucket_idx / fine_grained_shuffle_stream_count; // NOLINT(clang-analyzer-core.DivideZero) - size_t row_count_per_part = 0; - for (uint64_t stream_idx = 0; stream_idx < fine_grained_shuffle_stream_count; ++stream_idx) - { - Block dest_block = blocks[0].cloneEmpty(); - // For now we put all rows into one Block, may cause this Block too large. - dest_block.setColumns(std::move(final_dest_tbl_columns[bucket_idx + stream_idx])); - row_count_per_part += dest_block.rows(); - - chunk_codec_stream->encode(dest_block, 0, dest_block.rows()); - tracked_packets[part_id].addChunk(chunk_codec_stream->getString()); - tracked_packets[part_id].packet.add_stream_ids(stream_idx); - chunk_codec_stream->clear(); - } - responses_row_count[part_id] = row_count_per_part; - } - } - - writePackets(responses_row_count, tracked_packets); - - blocks.clear(); + assert(blocks.empty()); rows_in_blocks = 0; + writer->write(response.getResponse()); } -template class StreamingDAGResponseWriter; -template class StreamingDAGResponseWriter; -template class StreamingDAGResponseWriter; -template class StreamingDAGResponseWriter; - +template class StreamingDAGResponseWriter; +template class StreamingDAGResponseWriter; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h index 1e37090509b..b1bbe5b4baa 100644 --- a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h +++ b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h @@ -18,70 +18,37 @@ #include #include #include -#include +#include #include -#include -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wunused-parameter" #include -#include -#include - -#pragma GCC diagnostic pop +#include namespace DB { -/// Serializes the stream of blocks and sends them to TiDB or TiFlash with different serialization paths. -/// When sending data to TiDB, blocks with extra info are written into tipb::SelectResponse, then the whole tipb::SelectResponse is further serialized into mpp::MPPDataPacket.data. -/// Differently when sending data to TiFlash, blocks with only tuples are directly serialized into mpp::MPPDataPacket.chunks, but for the last block, its extra info (like execution summaries) is written into tipb::SelectResponse, then further serialized into mpp::MPPDataPacket.data. -template +/// Serializes the stream of blocks and sends them to TiDB/TiSpark with different serialization paths. +template class StreamingDAGResponseWriter : public DAGResponseWriter { public: StreamingDAGResponseWriter( StreamWriterPtr writer_, - std::vector partition_col_ids_, - TiDB::TiDBCollators collators_, - tipb::ExchangeType exchange_type_, Int64 records_per_chunk_, Int64 batch_send_min_limit_, bool should_send_exec_summary_at_last, - DAGContext & dag_context_, - UInt64 fine_grained_shuffle_stream_count_, - UInt64 fine_grained_shuffle_batch_size); + DAGContext & dag_context_); void write(const Block & block) override; void finishWrite() override; private: template - void batchWrite(); - template - void batchWriteFineGrainedShuffle(); - - template - void encodeThenWriteBlocks(const std::vector & input_blocks, TrackedSelectResp & response) const; - template - void partitionAndEncodeThenWriteBlocks(std::vector & input_blocks, TrackedSelectResp & response) const; - - template - void handleExecSummary(const std::vector & input_blocks, - std::vector & packet, - tipb::SelectResponse & response) const; - template - void writePackets(const std::vector & responses_row_count, std::vector & packets) const; + void encodeThenWriteBlocks(); Int64 batch_send_min_limit; bool should_send_exec_summary_at_last; /// only one stream needs to sending execution summaries at last. - tipb::ExchangeType exchange_type; StreamWriterPtr writer; std::vector blocks; - std::vector partition_col_ids; - TiDB::TiDBCollators collators; size_t rows_in_blocks; - uint16_t partition_num; std::unique_ptr chunk_codec_stream; - UInt64 fine_grained_shuffle_stream_count; - UInt64 fine_grained_shuffle_batch_size; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_streaming_dag_writer.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_streaming_dag_writer.cpp deleted file mode 100644 index 5d4186123b7..00000000000 --- a/dbms/src/Flash/Coprocessor/tests/gtest_streaming_dag_writer.cpp +++ /dev/null @@ -1,184 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include - -#include -#include - -namespace DB -{ -namespace tests -{ - -using BlockPtr = std::shared_ptr; -class TestStreamingDAGResponseWriter : public testing::Test -{ -protected: - void SetUp() override - { - dag_context_ptr = std::make_unique(1024); - dag_context_ptr->encode_type = tipb::EncodeType::TypeCHBlock; - dag_context_ptr->is_mpp_task = true; - dag_context_ptr->is_root_mpp_task = false; - dag_context_ptr->result_field_types = makeFields(); - context.setDAGContext(dag_context_ptr.get()); - } - -public: - TestStreamingDAGResponseWriter() - : context(TiFlashTestEnv::getContext()) - , part_col_ids{0} - , part_col_collators{ - TiDB::ITiDBCollator::getCollator(TiDB::ITiDBCollator::BINARY)} - {} - - // Return 10 Int64 column. - static std::vector makeFields() - { - std::vector fields(10); - for (int i = 0; i < 10; ++i) - { - fields[i].set_tp(TiDB::TypeLongLong); - } - return fields; - } - - // Return a block with **rows** and 10 Int64 column. - static BlockPtr prepareBlock(const std::vector & rows) - { - BlockPtr block = std::make_shared(); - for (int i = 0; i < 10; ++i) - { - DataTypePtr int64_data_type = std::make_shared(); - DataTypePtr nullable_int64_data_type = std::make_shared(int64_data_type); - MutableColumnPtr int64_col = nullable_int64_data_type->createColumn(); - for (Int64 r : rows) - { - int64_col->insert(Field(r)); - } - block->insert(ColumnWithTypeAndName{std::move(int64_col), - nullable_int64_data_type, - String("col") + std::to_string(i)}); - } - return block; - } - - Context context; - std::vector part_col_ids; - TiDB::TiDBCollators part_col_collators; - - std::unique_ptr dag_context_ptr; -}; - -using MockStreamWriterChecker = std::function; - -struct MockStreamWriter -{ - MockStreamWriter(MockStreamWriterChecker checker_, - uint16_t part_num_) - : checker(checker_) - , part_num(part_num_) - {} - - void write(mpp::MPPDataPacket &) { FAIL() << "cannot reach here, because we only expect hash partition"; } - void write(mpp::MPPDataPacket & packet, uint16_t part_id) { checker(packet, part_id); } - void write(tipb::SelectResponse &, uint16_t) { FAIL() << "cannot reach here, only consider CH Block format"; } - void write(tipb::SelectResponse &) { FAIL() << "cannot reach here, only consider CH Block format"; } - uint16_t getPartitionNum() const { return part_num; } - -private: - MockStreamWriterChecker checker; - uint16_t part_num; -}; - -// Input block data is distributed uniform. -// partition_num: 4 -// fine_grained_shuffle_stream_count: 8 -TEST_F(TestStreamingDAGResponseWriter, testBatchWriteFineGrainedShuffle) -try -{ - const size_t block_rows = 1024; - const uint16_t part_num = 4; - const uint32_t fine_grained_shuffle_stream_count = 8; - const Int64 fine_grained_shuffle_batch_size = 4096; - - // Set these to 1, because when fine grained shuffle is enabled, - // batchWriteFineGrainedShuffle() only check fine_grained_shuffle_batch_size. - // records_per_chunk and batch_send_min_limit are useless. - const Int64 records_per_chunk = 1; - const Int64 batch_send_min_limit = 1; - const bool should_send_exec_summary_at_last = true; - - // 1. Build Block. - std::vector uniform_data_set; - for (size_t i = 0; i < block_rows; ++i) - { - uniform_data_set.push_back(i); - } - BlockPtr block = prepareBlock(uniform_data_set); - - // 2. Build MockStreamWriter. - std::unordered_map write_report; - auto checker = [&write_report](mpp::MPPDataPacket & packet, uint16_t part_id) { - auto res = write_report.insert({part_id, packet}); - // Should always insert succeed. - // Because block.rows(1024) < fine_grained_shuffle_batch_size(4096), - // batchWriteFineGrainedShuffle() only called once, so will only be one packet for each partition. - ASSERT_TRUE(res.second); - }; - auto mock_writer = std::make_shared(checker, part_num); - - // 3. Start to write. - auto dag_writer = std::make_shared, /*enable_fine_grained_shuffle=*/true>>( - mock_writer, - part_col_ids, - part_col_collators, - tipb::ExchangeType::Hash, - records_per_chunk, - batch_send_min_limit, - should_send_exec_summary_at_last, - *dag_context_ptr, - fine_grained_shuffle_stream_count, - fine_grained_shuffle_batch_size); - dag_writer->write(*block); - dag_writer->finishWrite(); - - // 4. Start to check write_report. - std::vector decoded_blocks; - ASSERT_EQ(write_report.size(), part_num); - for (const auto & ele : write_report) - { - const mpp::MPPDataPacket & packet = ele.second; - ASSERT_EQ(packet.chunks_size(), packet.stream_ids_size()); - for (int i = 0; i < packet.chunks_size(); ++i) - { - decoded_blocks.push_back(CHBlockChunkCodec::decode(packet.chunks(i), *block)); - } - } - ASSERT_EQ(decoded_blocks.size(), fine_grained_shuffle_stream_count * part_num); - for (const auto & block : decoded_blocks) - { - ASSERT_EQ(block.rows(), block_rows / (fine_grained_shuffle_stream_count * part_num)); - } -} -CATCH - -} // namespace tests -} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp new file mode 100644 index 00000000000..2f0ceaf87a2 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp @@ -0,0 +1,222 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ +namespace tests +{ +class TestStreamingWriter : public testing::Test +{ +protected: + void SetUp() override + { + dag_context_ptr = std::make_unique(1024); + dag_context_ptr->is_mpp_task = true; + dag_context_ptr->is_root_mpp_task = true; + dag_context_ptr->result_field_types = makeFields(); + context.setDAGContext(dag_context_ptr.get()); + } + +public: + TestStreamingWriter() + : context(TiFlashTestEnv::getContext()) + {} + + // Return 10 Int64 column. + static std::vector makeFields() + { + std::vector fields(10); + for (int i = 0; i < 10; ++i) + { + fields[i].set_tp(TiDB::TypeLongLong); + fields[i].set_flag(TiDB::ColumnFlagNotNull); + } + return fields; + } + + static DAGSchema makeSchema() + { + auto fields = makeFields(); + DAGSchema schema; + for (size_t i = 0; i < fields.size(); ++i) + { + ColumnInfo info = TiDB::fieldTypeToColumnInfo(fields[i]); + schema.emplace_back(String("col") + std::to_string(i), std::move(info)); + } + return schema; + } + + // Return a block with **rows** and 10 Int64 column. + static Block prepareBlock(size_t rows) + { + Block block; + for (size_t i = 0; i < 10; ++i) + { + DataTypePtr int64_data_type = std::make_shared(); + auto int64_column = ColumnGenerator::instance().generate({rows, "Int64", RANDOM}).column; + block.insert(ColumnWithTypeAndName{ + std::move(int64_column), + int64_data_type, + String("col") + std::to_string(i)}); + } + return block; + } + + Context context; + + std::unique_ptr dag_context_ptr; +}; + +using MockStreamWriterChecker = std::function; + +struct MockStreamWriter +{ + explicit MockStreamWriter(MockStreamWriterChecker checker_) + : checker(checker_) + {} + + void write(mpp::MPPDataPacket &) { FAIL() << "cannot reach here."; } + void write(mpp::MPPDataPacket &, uint16_t) { FAIL() << "cannot reach here."; } + void write(tipb::SelectResponse & response, uint16_t part_id) { checker(response, part_id); } + void write(tipb::SelectResponse & response) { checker(response, 0); } + uint16_t getPartitionNum() const { return 1; } + +private: + MockStreamWriterChecker checker; +}; + +TEST_F(TestStreamingWriter, testStreamingWriter) +try +{ + std::vector encode_types{ + tipb::EncodeType::TypeDefault, + tipb::EncodeType::TypeChunk, + tipb::EncodeType::TypeCHBlock}; + for (const auto & encode_type : encode_types) + { + dag_context_ptr->encode_type = encode_type; + + const size_t block_rows = 64; + const size_t block_num = 64; + const size_t batch_send_min_limit = 108; + + const bool should_send_exec_summary_at_last = true; + + // 1. Build Blocks. + std::vector blocks; + for (size_t i = 0; i < block_num; ++i) + { + blocks.emplace_back(prepareBlock(block_rows)); + blocks.emplace_back(prepareBlock(0)); + } + Block header = blocks.back(); + + // 2. Build MockStreamWriter. + std::vector write_report; + auto checker = [&write_report](tipb::SelectResponse & response, uint16_t part_id) { + ASSERT_EQ(part_id, 0); + write_report.emplace_back(std::move(response)); + }; + auto mock_writer = std::make_shared(checker); + + // 3. Start to write. + auto dag_writer = std::make_shared>>( + mock_writer, + batch_send_min_limit, + batch_send_min_limit, + should_send_exec_summary_at_last, + *dag_context_ptr); + for (const auto & block : blocks) + dag_writer->write(block); + dag_writer->finishWrite(); + + // 4. Start to check write_report. + size_t expect_rows = block_rows * block_num; + size_t decoded_block_rows = 0; + for (const auto & resp : write_report) + { + for (int i = 0; i < resp.chunks_size(); ++i) + { + const tipb::Chunk & chunk = resp.chunks(i); + Block decoded_block; + switch (encode_type) + { + case tipb::EncodeType::TypeDefault: + decoded_block = DefaultChunkCodec().decode(chunk.rows_data(), makeSchema()); + break; + case tipb::EncodeType::TypeChunk: + decoded_block = ArrowChunkCodec().decode(chunk.rows_data(), makeSchema()); + break; + case tipb::EncodeType::TypeCHBlock: + decoded_block = CHBlockChunkCodec::decode(chunk.rows_data(), header); + break; + } + decoded_block_rows += decoded_block.rows(); + } + } + ASSERT_EQ(decoded_block_rows, expect_rows); + } +} +CATCH + +TEST_F(TestStreamingWriter, emptyBlock) +try +{ + std::vector encode_types{ + tipb::EncodeType::TypeDefault, + tipb::EncodeType::TypeChunk, + tipb::EncodeType::TypeCHBlock}; + for (const auto & encode_type : encode_types) + { + dag_context_ptr->encode_type = encode_type; + + std::vector write_report; + auto checker = [&write_report](tipb::SelectResponse & response, uint16_t part_id) { + ASSERT_EQ(part_id, 0); + write_report.emplace_back(std::move(response)); + }; + auto mock_writer = std::make_shared(checker); + + const size_t batch_send_min_limit = 5; + const bool should_send_exec_summary_at_last = true; + auto dag_writer = std::make_shared>>( + mock_writer, + batch_send_min_limit, + batch_send_min_limit, + should_send_exec_summary_at_last, + *dag_context_ptr); + dag_writer->finishWrite(); + + // For `should_send_exec_summary_at_last = true`, there is at least one packet used to pass execution summary. + ASSERT_EQ(write_report.size(), 1); + ASSERT_EQ(write_report.back().chunks_size(), 0); + } +} +CATCH + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp new file mode 100644 index 00000000000..69ec6f94a12 --- /dev/null +++ b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp @@ -0,0 +1,102 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +namespace DB +{ +template +BroadcastOrPassThroughWriter::BroadcastOrPassThroughWriter( + StreamWriterPtr writer_, + Int64 batch_send_min_limit_, + bool should_send_exec_summary_at_last_, + DAGContext & dag_context_) + : DAGResponseWriter(/*records_per_chunk=*/-1, dag_context_) + , batch_send_min_limit(batch_send_min_limit_) + , should_send_exec_summary_at_last(should_send_exec_summary_at_last_) + , writer(writer_) +{ + rows_in_blocks = 0; + RUNTIME_CHECK(dag_context.encode_type == tipb::EncodeType::TypeCHBlock); + chunk_codec_stream = std::make_unique()->newCodecStream(dag_context.result_field_types); +} + +template +void BroadcastOrPassThroughWriter::finishWrite() +{ + if (should_send_exec_summary_at_last) + { + encodeThenWriteBlocks(); + } + else + { + encodeThenWriteBlocks(); + } +} + +template +void BroadcastOrPassThroughWriter::write(const Block & block) +{ + RUNTIME_CHECK_MSG( + block.columns() == dag_context.result_field_types.size(), + "Output column size mismatch with field type size"); + size_t rows = block.rows(); + rows_in_blocks += rows; + if (rows > 0) + { + blocks.push_back(block); + } + + if (static_cast(rows_in_blocks) > batch_send_min_limit) + encodeThenWriteBlocks(); +} + +template +template +void BroadcastOrPassThroughWriter::encodeThenWriteBlocks() +{ + TrackedMppDataPacket tracked_packet(current_memory_tracker); + if constexpr (send_exec_summary_at_last) + { + TrackedSelectResp response; + addExecuteSummaries(response.getResponse(), /*delta_mode=*/false); + tracked_packet.serializeByResponse(response.getResponse()); + } + if (blocks.empty()) + { + if constexpr (send_exec_summary_at_last) + { + writer->write(tracked_packet.getPacket()); + } + return; + } + while (!blocks.empty()) + { + const auto & block = blocks.back(); + chunk_codec_stream->encode(block, 0, block.rows()); + blocks.pop_back(); + tracked_packet.addChunk(chunk_codec_stream->getString()); + chunk_codec_stream->clear(); + } + assert(blocks.empty()); + rows_in_blocks = 0; + writer->write(tracked_packet.getPacket()); +} + +template class BroadcastOrPassThroughWriter; + +} // namespace DB diff --git a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h new file mode 100644 index 00000000000..647d9b8260a --- /dev/null +++ b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h @@ -0,0 +1,49 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +namespace DB +{ +template +class BroadcastOrPassThroughWriter : public DAGResponseWriter +{ +public: + BroadcastOrPassThroughWriter( + StreamWriterPtr writer_, + Int64 batch_send_min_limit_, + bool should_send_exec_summary_at_last, + DAGContext & dag_context_); + void write(const Block & block) override; + void finishWrite() override; + +private: + template + void encodeThenWriteBlocks(); + + Int64 batch_send_min_limit; + bool should_send_exec_summary_at_last; + StreamWriterPtr writer; + std::vector blocks; + size_t rows_in_blocks; + std::unique_ptr chunk_codec_stream; +}; + +} // namespace DB diff --git a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp new file mode 100644 index 00000000000..4c21468d09d --- /dev/null +++ b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp @@ -0,0 +1,167 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +namespace DB +{ +template +FineGrainedShuffleWriter::FineGrainedShuffleWriter( + StreamWriterPtr writer_, + std::vector partition_col_ids_, + TiDB::TiDBCollators collators_, + bool should_send_exec_summary_at_last_, + DAGContext & dag_context_, + uint64_t fine_grained_shuffle_stream_count_, + UInt64 fine_grained_shuffle_batch_size_) + : DAGResponseWriter(/*records_per_chunk=*/-1, dag_context_) + , should_send_exec_summary_at_last(should_send_exec_summary_at_last_) + , writer(writer_) + , partition_col_ids(std::move(partition_col_ids_)) + , collators(std::move(collators_)) + , fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_) + , fine_grained_shuffle_batch_size(fine_grained_shuffle_batch_size_) +{ + rows_in_blocks = 0; + partition_num = writer_->getPartitionNum(); + RUNTIME_CHECK(partition_num > 0); + RUNTIME_CHECK(dag_context.encode_type == tipb::EncodeType::TypeCHBlock); + chunk_codec_stream = std::make_unique()->newCodecStream(dag_context.result_field_types); +} + +template +void FineGrainedShuffleWriter::finishWrite() +{ + if (should_send_exec_summary_at_last) + { + batchWriteFineGrainedShuffle(); + } + else + { + batchWriteFineGrainedShuffle(); + } +} + +template +void FineGrainedShuffleWriter::write(const Block & block) +{ + RUNTIME_CHECK_MSG( + block.columns() == dag_context.result_field_types.size(), + "Output column size mismatch with field type size"); + size_t rows = block.rows(); + rows_in_blocks += rows; + if (rows > 0) + { + blocks.push_back(block); + } + + if (static_cast(rows_in_blocks) >= fine_grained_shuffle_batch_size) + batchWriteFineGrainedShuffle(); +} + +template +template +void FineGrainedShuffleWriter::batchWriteFineGrainedShuffle() +{ + std::vector tracked_packets(partition_num); + + if (!blocks.empty()) + { + assert(rows_in_blocks > 0); + assert(fine_grained_shuffle_stream_count <= 1024); + + // fine_grained_shuffle_stream_count is in (0, 1024], and partition_num is uint16_t, so will not overflow. + uint32_t bucket_num = partition_num * fine_grained_shuffle_stream_count; + + HashBaseWriterHelper::materializeBlocks(blocks); + auto final_dest_tbl_columns = HashBaseWriterHelper::createDestColumns(blocks[0], bucket_num); + Block dest_block = blocks[0].cloneEmpty(); + + // Hash partition input_blocks into bucket_num. + while (!blocks.empty()) + { + const auto & block = blocks.back(); + size_t columns = block.columns(); + std::vector partition_key_containers(collators.size()); + auto dest_tbl_columns = HashBaseWriterHelper::createDestColumns(block, bucket_num); + HashBaseWriterHelper::computeHash(block, bucket_num, collators, partition_key_containers, partition_col_ids, dest_tbl_columns); + blocks.pop_back(); + + for (size_t bucket_idx = 0; bucket_idx < bucket_num; ++bucket_idx) + { + for (size_t col_id = 0; col_id < columns; ++col_id) + { + const MutableColumnPtr & src_col = dest_tbl_columns[bucket_idx][col_id]; + final_dest_tbl_columns[bucket_idx][col_id]->insertRangeFrom(*src_col, 0, src_col->size()); + } + } + } + assert(blocks.empty()); + rows_in_blocks = 0; + + // For i-th stream_count buckets, send to i-th tiflash node. + for (size_t bucket_idx = 0; bucket_idx < bucket_num; bucket_idx += fine_grained_shuffle_stream_count) + { + size_t part_id = bucket_idx / fine_grained_shuffle_stream_count; // NOLINT(clang-analyzer-core.DivideZero) + for (uint64_t stream_idx = 0; stream_idx < fine_grained_shuffle_stream_count; ++stream_idx) + { + // For now we put all rows into one Block, may cause this Block too large. + dest_block.setColumns(std::move(final_dest_tbl_columns[bucket_idx + stream_idx])); + size_t dest_block_rows = dest_block.rows(); + if (dest_block_rows > 0) + { + chunk_codec_stream->encode(dest_block, 0, dest_block_rows); + tracked_packets[part_id].addChunk(chunk_codec_stream->getString()); + chunk_codec_stream->clear(); + tracked_packets[part_id].packet.add_stream_ids(stream_idx); + } + } + } + } + + writePackets(tracked_packets); +} + +template +template +void FineGrainedShuffleWriter::writePackets(std::vector & packets) +{ + size_t part_id = 0; + + if constexpr (send_exec_summary_at_last) + { + tipb::SelectResponse response; + addExecuteSummaries(response, /*delta_mode=*/false); + /// Sending the response to only one node, default the first one. + assert(!packets.empty()); + packets[0].serializeByResponse(response); + writer->write(packets[0].getPacket(), 0); + part_id = 1; + } + + for (; part_id < packets.size(); ++part_id) + { + auto & packet = packets[part_id].getPacket(); + if (packet.chunks_size() > 0) + writer->write(packet, part_id); + } +} + +template class FineGrainedShuffleWriter; + +} // namespace DB diff --git a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h new file mode 100644 index 00000000000..0b29187b146 --- /dev/null +++ b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h @@ -0,0 +1,59 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +namespace DB +{ +template +class FineGrainedShuffleWriter : public DAGResponseWriter +{ +public: + FineGrainedShuffleWriter( + StreamWriterPtr writer_, + std::vector partition_col_ids_, + TiDB::TiDBCollators collators_, + bool should_send_exec_summary_at_last, + DAGContext & dag_context_, + UInt64 fine_grained_shuffle_stream_count_, + UInt64 fine_grained_shuffle_batch_size); + void write(const Block & block) override; + void finishWrite() override; + +private: + template + void batchWriteFineGrainedShuffle(); + + template + void writePackets(std::vector & packets); + + bool should_send_exec_summary_at_last; + StreamWriterPtr writer; + std::vector blocks; + std::vector partition_col_ids; + TiDB::TiDBCollators collators; + size_t rows_in_blocks; + uint16_t partition_num; + std::unique_ptr chunk_codec_stream; + UInt64 fine_grained_shuffle_stream_count; + UInt64 fine_grained_shuffle_batch_size; +}; + +} // namespace DB diff --git a/dbms/src/Flash/Mpp/HashBaseWriterHelper.cpp b/dbms/src/Flash/Mpp/HashBaseWriterHelper.cpp new file mode 100644 index 00000000000..09f8c9f31f9 --- /dev/null +++ b/dbms/src/Flash/Mpp/HashBaseWriterHelper.cpp @@ -0,0 +1,80 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace DB::HashBaseWriterHelper +{ +void materializeBlocks(std::vector & blocks) +{ + for (auto & block : blocks) + { + for (size_t i = 0; i < block.columns(); ++i) + { + auto & element = block.getByPosition(i); + auto & src = element.column; + if (ColumnPtr converted = src->convertToFullColumnIfConst()) + src = converted; + } + } +} + +std::vector createDestColumns(const Block & sample_block, size_t num) +{ + std::vector dest_tbl_cols(num); + for (auto & cols : dest_tbl_cols) + cols = sample_block.cloneEmptyColumns(); + return dest_tbl_cols; +} + +void computeHash(const Block & input_block, + uint32_t bucket_num, + const TiDB::TiDBCollators & collators, + std::vector & partition_key_containers, + const std::vector & partition_col_ids, + std::vector> & result_columns) +{ + size_t rows = input_block.rows(); + WeakHash32 hash(rows); + + // get hash values by all partition key columns + for (size_t i = 0; i < partition_col_ids.size(); ++i) + { + input_block.getByPosition(partition_col_ids[i]).column->updateWeakHash32(hash, collators[i], partition_key_containers[i]); + } + + const auto & hash_data = hash.getData(); + + // partition each row + IColumn::Selector selector(rows); + for (size_t row = 0; row < rows; ++row) + { + /// Row from interval [(2^32 / bucket_num) * i, (2^32 / bucket_num) * (i + 1)) goes to bucket with number i. + selector[row] = hash_data[row]; /// [0, 2^32) + selector[row] *= bucket_num; /// [0, bucket_num * 2^32), selector stores 64 bit values. + selector[row] >>= 32u; /// [0, bucket_num) + } + + for (size_t col_id = 0; col_id < input_block.columns(); ++col_id) + { + // Scatter columns to different partitions + std::vector part_columns = input_block.getByPosition(col_id).column->scatter(bucket_num, selector); + assert(part_columns.size() == bucket_num); + for (size_t bucket_idx = 0; bucket_idx < bucket_num; ++bucket_idx) + { + result_columns[bucket_idx][col_id] = std::move(part_columns[bucket_idx]); + } + } +} +} // namespace DB::HashBaseWriterHelper diff --git a/dbms/src/Flash/Mpp/HashBaseWriterHelper.h b/dbms/src/Flash/Mpp/HashBaseWriterHelper.h new file mode 100644 index 00000000000..be82f162747 --- /dev/null +++ b/dbms/src/Flash/Mpp/HashBaseWriterHelper.h @@ -0,0 +1,32 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB::HashBaseWriterHelper +{ +void materializeBlocks(std::vector & input_blocks); + +std::vector createDestColumns(const Block & sample_block, size_t num); + +void computeHash(const Block & input_block, + uint32_t bucket_num, + const TiDB::TiDBCollators & collators, + std::vector & partition_key_containers, + const std::vector & partition_col_ids, + std::vector> & result_columns); +} // namespace DB::HashBaseWriterHelper diff --git a/dbms/src/Flash/Mpp/HashParitionWriter.h b/dbms/src/Flash/Mpp/HashParitionWriter.h new file mode 100644 index 00000000000..4efa73e56aa --- /dev/null +++ b/dbms/src/Flash/Mpp/HashParitionWriter.h @@ -0,0 +1,57 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +namespace DB +{ +template +class HashPartitionWriter : public DAGResponseWriter +{ +public: + HashPartitionWriter( + StreamWriterPtr writer_, + std::vector partition_col_ids_, + TiDB::TiDBCollators collators_, + Int64 batch_send_min_limit_, + bool should_send_exec_summary_at_last, + DAGContext & dag_context_); + void write(const Block & block) override; + void finishWrite() override; + +private: + template + void partitionAndEncodeThenWriteBlocks(); + + template + void writePackets(std::vector & packets); + + Int64 batch_send_min_limit; + bool should_send_exec_summary_at_last; + StreamWriterPtr writer; + std::vector blocks; + std::vector partition_col_ids; + TiDB::TiDBCollators collators; + size_t rows_in_blocks; + uint16_t partition_num; + std::unique_ptr chunk_codec_stream; +}; + +} // namespace DB diff --git a/dbms/src/Flash/Mpp/HashPartitionWriter.cpp b/dbms/src/Flash/Mpp/HashPartitionWriter.cpp new file mode 100644 index 00000000000..8a13acdb0f0 --- /dev/null +++ b/dbms/src/Flash/Mpp/HashPartitionWriter.cpp @@ -0,0 +1,142 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +namespace DB +{ +template +HashPartitionWriter::HashPartitionWriter( + StreamWriterPtr writer_, + std::vector partition_col_ids_, + TiDB::TiDBCollators collators_, + Int64 batch_send_min_limit_, + bool should_send_exec_summary_at_last_, + DAGContext & dag_context_) + : DAGResponseWriter(/*records_per_chunk=*/-1, dag_context_) + , batch_send_min_limit(batch_send_min_limit_) + , should_send_exec_summary_at_last(should_send_exec_summary_at_last_) + , writer(writer_) + , partition_col_ids(std::move(partition_col_ids_)) + , collators(std::move(collators_)) +{ + rows_in_blocks = 0; + partition_num = writer_->getPartitionNum(); + RUNTIME_CHECK(partition_num > 0); + RUNTIME_CHECK(dag_context.encode_type == tipb::EncodeType::TypeCHBlock); + chunk_codec_stream = std::make_unique()->newCodecStream(dag_context.result_field_types); +} + +template +void HashPartitionWriter::finishWrite() +{ + if (should_send_exec_summary_at_last) + { + partitionAndEncodeThenWriteBlocks(); + } + else + { + partitionAndEncodeThenWriteBlocks(); + } +} + +template +void HashPartitionWriter::write(const Block & block) +{ + RUNTIME_CHECK_MSG( + block.columns() == dag_context.result_field_types.size(), + "Output column size mismatch with field type size"); + size_t rows = block.rows(); + rows_in_blocks += rows; + if (rows > 0) + { + blocks.push_back(block); + } + + if (static_cast(rows_in_blocks) > batch_send_min_limit) + partitionAndEncodeThenWriteBlocks(); +} + +template +template +void HashPartitionWriter::partitionAndEncodeThenWriteBlocks() +{ + std::vector tracked_packets(partition_num); + + if (!blocks.empty()) + { + assert(rows_in_blocks > 0); + + HashBaseWriterHelper::materializeBlocks(blocks); + Block dest_block = blocks[0].cloneEmpty(); + std::vector partition_key_containers(collators.size()); + + while (!blocks.empty()) + { + const auto & block = blocks.back(); + auto dest_tbl_cols = HashBaseWriterHelper::createDestColumns(block, partition_num); + HashBaseWriterHelper::computeHash(block, partition_num, collators, partition_key_containers, partition_col_ids, dest_tbl_cols); + blocks.pop_back(); + + for (size_t part_id = 0; part_id < partition_num; ++part_id) + { + dest_block.setColumns(std::move(dest_tbl_cols[part_id])); + size_t dest_block_rows = dest_block.rows(); + if (dest_block_rows > 0) + { + chunk_codec_stream->encode(dest_block, 0, dest_block_rows); + tracked_packets[part_id].addChunk(chunk_codec_stream->getString()); + chunk_codec_stream->clear(); + } + } + } + assert(blocks.empty()); + rows_in_blocks = 0; + } + + writePackets(tracked_packets); +} + +template +template +void HashPartitionWriter::writePackets(std::vector & packets) +{ + size_t part_id = 0; + + if constexpr (send_exec_summary_at_last) + { + tipb::SelectResponse response; + addExecuteSummaries(response, /*delta_mode=*/false); + /// Sending the response to only one node, default the first one. + assert(!packets.empty()); + packets[0].serializeByResponse(response); + writer->write(packets[0].getPacket(), 0); + part_id = 1; + } + + for (; part_id < packets.size(); ++part_id) + { + auto & packet = packets[part_id].getPacket(); + if (packet.chunks_size() > 0) + writer->write(packet, part_id); + } +} + +template class HashPartitionWriter; + +} // namespace DB diff --git a/dbms/src/Flash/Mpp/newMPPExchangeWriter.h b/dbms/src/Flash/Mpp/newMPPExchangeWriter.h new file mode 100644 index 00000000000..541ba94498c --- /dev/null +++ b/dbms/src/Flash/Mpp/newMPPExchangeWriter.h @@ -0,0 +1,87 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +namespace DB +{ +template +std::unique_ptr newMPPExchangeWriter( + const StreamWriterPtr & writer, + const std::vector & partition_col_ids, + const TiDB::TiDBCollators & partition_col_collators, + const tipb::ExchangeType & exchange_type, + Int64 records_per_chunk, + Int64 batch_send_min_limit, + bool should_send_exec_summary_at_last, + DAGContext & dag_context, + bool enable_fine_grained_shuffle, + UInt64 fine_grained_shuffle_stream_count, + UInt64 fine_grained_shuffle_batch_size) +{ + RUNTIME_CHECK(dag_context.isMPPTask()); + if (dag_context.isRootMPPTask()) + { + RUNTIME_CHECK(!enable_fine_grained_shuffle); + RUNTIME_CHECK(exchange_type == tipb::ExchangeType::PassThrough); + return std::make_unique>( + writer, + records_per_chunk, + batch_send_min_limit, + should_send_exec_summary_at_last, + dag_context); + } + else + { + if (exchange_type == tipb::ExchangeType::Hash) + { + if (enable_fine_grained_shuffle) + { + return std::make_unique>( + writer, + partition_col_ids, + partition_col_collators, + should_send_exec_summary_at_last, + dag_context, + fine_grained_shuffle_stream_count, + fine_grained_shuffle_batch_size); + } + else + { + return std::make_unique>( + writer, + partition_col_ids, + partition_col_collators, + batch_send_min_limit, + should_send_exec_summary_at_last, + dag_context); + } + } + else + { + RUNTIME_CHECK(!enable_fine_grained_shuffle); + return std::make_unique>( + writer, + batch_send_min_limit, + should_send_exec_summary_at_last, + dag_context); + } + } +} +} // namespace DB diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp new file mode 100644 index 00000000000..0d21290aea5 --- /dev/null +++ b/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp @@ -0,0 +1,455 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace DB +{ +namespace tests +{ +class TestMPPExchangeWriter : public testing::Test +{ +protected: + void SetUp() override + { + dag_context_ptr = std::make_unique(1024); + dag_context_ptr->encode_type = tipb::EncodeType::TypeCHBlock; + dag_context_ptr->is_mpp_task = true; + dag_context_ptr->is_root_mpp_task = false; + dag_context_ptr->result_field_types = makeFields(); + context.setDAGContext(dag_context_ptr.get()); + } + +public: + TestMPPExchangeWriter() + : context(TiFlashTestEnv::getContext()) + , part_col_ids{0} + , part_col_collators{ + TiDB::ITiDBCollator::getCollator(TiDB::ITiDBCollator::BINARY)} + {} + + // Return 10 Int64 column. + static std::vector makeFields() + { + std::vector fields(10); + for (int i = 0; i < 10; ++i) + { + fields[i].set_tp(TiDB::TypeLongLong); + fields[i].set_flag(TiDB::ColumnFlagNotNull); + } + return fields; + } + + // Return a block with **rows** and 10 Int64 column. + static Block prepareUniformBlock(size_t rows) + { + std::vector uniform_data_set; + for (size_t i = 0; i < rows; ++i) + { + uniform_data_set.push_back(i); + } + Block block; + for (int i = 0; i < 10; ++i) + { + DataTypePtr int64_data_type = std::make_shared(); + MutableColumnPtr int64_col = int64_data_type->createColumn(); + for (Int64 r : uniform_data_set) + { + int64_col->insert(Field(r)); + } + block.insert(ColumnWithTypeAndName{ + std::move(int64_col), + int64_data_type, + String("col") + std::to_string(i)}); + } + return block; + } + + // Return a block with **rows** and 10 Int64 column. + static Block prepareRandomBlock(size_t rows) + { + Block block; + for (size_t i = 0; i < 10; ++i) + { + DataTypePtr int64_data_type = std::make_shared(); + auto int64_column = ColumnGenerator::instance().generate({rows, "Int64", RANDOM}).column; + block.insert(ColumnWithTypeAndName{ + std::move(int64_column), + int64_data_type, + String("col") + std::to_string(i)}); + } + return block; + } + + Context context; + std::vector part_col_ids; + TiDB::TiDBCollators part_col_collators; + + std::unique_ptr dag_context_ptr; +}; + +using MockStreamWriterChecker = std::function; + +struct MockStreamWriter +{ + MockStreamWriter(MockStreamWriterChecker checker_, + uint16_t part_num_) + : checker(checker_) + , part_num(part_num_) + {} + + void write(mpp::MPPDataPacket & packet) { checker(packet, 0); } + void write(mpp::MPPDataPacket & packet, uint16_t part_id) { checker(packet, part_id); } + void write(tipb::SelectResponse &, uint16_t) { FAIL() << "cannot reach here, only consider CH Block format"; } + void write(tipb::SelectResponse &) { FAIL() << "cannot reach here, only consider CH Block format"; } + uint16_t getPartitionNum() const { return part_num; } + +private: + MockStreamWriterChecker checker; + uint16_t part_num; +}; + +// Input block data is distributed uniform. +// partition_num: 4 +// fine_grained_shuffle_stream_count: 8 +TEST_F(TestMPPExchangeWriter, testBatchWriteFineGrainedShuffle) +try +{ + const size_t block_rows = 1024; + const uint16_t part_num = 4; + const uint32_t fine_grained_shuffle_stream_count = 8; + const Int64 fine_grained_shuffle_batch_size = 4096; + + const bool should_send_exec_summary_at_last = true; + + // 1. Build Block. + auto block = prepareUniformBlock(block_rows); + + // 2. Build MockStreamWriter. + std::unordered_map write_report; + auto checker = [&write_report](mpp::MPPDataPacket & packet, uint16_t part_id) { + auto res = write_report.insert({part_id, packet}); + // Should always insert succeed. + // Because block.rows(1024) < fine_grained_shuffle_batch_size(4096), + // batchWriteFineGrainedShuffle() only called once, so will only be one packet for each partition. + ASSERT_TRUE(res.second); + }; + auto mock_writer = std::make_shared(checker, part_num); + + // 3. Start to write. + auto dag_writer = std::make_shared>>( + mock_writer, + part_col_ids, + part_col_collators, + should_send_exec_summary_at_last, + *dag_context_ptr, + fine_grained_shuffle_stream_count, + fine_grained_shuffle_batch_size); + dag_writer->write(block); + dag_writer->finishWrite(); + + // 4. Start to check write_report. + std::vector decoded_blocks; + ASSERT_EQ(write_report.size(), part_num); + for (const auto & ele : write_report) + { + const mpp::MPPDataPacket & packet = ele.second; + ASSERT_EQ(packet.chunks_size(), packet.stream_ids_size()); + for (int i = 0; i < packet.chunks_size(); ++i) + { + decoded_blocks.push_back(CHBlockChunkCodec::decode(packet.chunks(i), block)); + } + } + ASSERT_EQ(decoded_blocks.size(), fine_grained_shuffle_stream_count * part_num); + for (const auto & block : decoded_blocks) + { + ASSERT_EQ(block.rows(), block_rows / (fine_grained_shuffle_stream_count * part_num)); + } +} +CATCH + +TEST_F(TestMPPExchangeWriter, testFineGrainedShuffleWriter) +try +{ + const size_t block_rows = 64; + const size_t block_num = 64; + const uint16_t part_num = 4; + const uint32_t fine_grained_shuffle_stream_count = 8; + const Int64 fine_grained_shuffle_batch_size = 108; + + const bool should_send_exec_summary_at_last = true; + + // 1. Build Block. + std::vector blocks; + for (size_t i = 0; i < block_num; ++i) + { + blocks.emplace_back(prepareUniformBlock(block_rows)); + blocks.emplace_back(prepareUniformBlock(0)); + } + Block header = blocks.back(); + + // 2. Build MockStreamWriter. + std::unordered_map> write_report; + auto checker = [&write_report](mpp::MPPDataPacket & packet, uint16_t part_id) { + write_report[part_id].emplace_back(std::move(packet)); + }; + auto mock_writer = std::make_shared(checker, part_num); + + // 3. Start to write. + auto dag_writer = std::make_shared>>( + mock_writer, + part_col_ids, + part_col_collators, + should_send_exec_summary_at_last, + *dag_context_ptr, + fine_grained_shuffle_stream_count, + fine_grained_shuffle_batch_size); + for (const auto & block : blocks) + dag_writer->write(block); + dag_writer->finishWrite(); + + // 4. Start to check write_report. + size_t per_part_rows = block_rows * block_num / part_num; + ASSERT_EQ(write_report.size(), part_num); + std::vector rows_of_stream_ids(fine_grained_shuffle_stream_count, 0); + for (const auto & ele : write_report) + { + size_t part_decoded_block_rows = 0; + for (const auto & packet : ele.second) + { + ASSERT_EQ(packet.chunks_size(), packet.stream_ids_size()); + for (int i = 0; i < packet.chunks_size(); ++i) + { + auto decoded_block = CHBlockChunkCodec::decode(packet.chunks(i), header); + part_decoded_block_rows += decoded_block.rows(); + rows_of_stream_ids[packet.stream_ids(i)] += decoded_block.rows(); + } + } + ASSERT_EQ(part_decoded_block_rows, per_part_rows); + } + size_t per_stream_id_rows = block_rows * block_num / fine_grained_shuffle_stream_count; + for (size_t rows : rows_of_stream_ids) + ASSERT_EQ(rows, per_stream_id_rows); +} +CATCH + +TEST_F(TestMPPExchangeWriter, emptyBlockForFineGrainedShuffleWriter) +try +{ + const uint16_t part_num = 4; + const uint32_t fine_grained_shuffle_stream_count = 8; + const Int64 fine_grained_shuffle_batch_size = 4096; + const bool should_send_exec_summary_at_last = true; + + std::unordered_map write_report; + auto checker = [&write_report](mpp::MPPDataPacket & packet, uint16_t part_id) { + auto res = write_report.insert({part_id, packet}); + // Should always insert succeed. There is at most one packet per partition. + ASSERT_TRUE(res.second); + }; + auto mock_writer = std::make_shared(checker, part_num); + + auto dag_writer = std::make_shared>>( + mock_writer, + part_col_ids, + part_col_collators, + should_send_exec_summary_at_last, + *dag_context_ptr, + fine_grained_shuffle_stream_count, + fine_grained_shuffle_batch_size); + dag_writer->finishWrite(); + + // For `should_send_exec_summary_at_last = true`, there is at least one packet used to pass execution summary. + ASSERT_EQ(write_report.size(), 1); + ASSERT_EQ(write_report.cbegin()->second.chunks_size(), 0); +} +CATCH + +TEST_F(TestMPPExchangeWriter, testHashPartitionWriter) +try +{ + const size_t block_rows = 64; + const size_t block_num = 64; + const size_t batch_send_min_limit = 108; + const uint16_t part_num = 4; + + const bool should_send_exec_summary_at_last = true; + + // 1. Build Blocks. + std::vector blocks; + for (size_t i = 0; i < block_num; ++i) + { + blocks.emplace_back(prepareUniformBlock(block_rows)); + blocks.emplace_back(prepareUniformBlock(0)); + } + Block header = blocks.back(); + + // 2. Build MockStreamWriter. + std::unordered_map> write_report; + auto checker = [&write_report](mpp::MPPDataPacket & packet, uint16_t part_id) { + write_report[part_id].emplace_back(std::move(packet)); + }; + auto mock_writer = std::make_shared(checker, part_num); + + // 3. Start to write. + auto dag_writer = std::make_shared>>( + mock_writer, + part_col_ids, + part_col_collators, + batch_send_min_limit, + should_send_exec_summary_at_last, + *dag_context_ptr); + for (const auto & block : blocks) + dag_writer->write(block); + dag_writer->finishWrite(); + + // 4. Start to check write_report. + size_t per_part_rows = block_rows * block_num / part_num; + ASSERT_EQ(write_report.size(), part_num); + for (const auto & ele : write_report) + { + size_t decoded_block_rows = 0; + for (const auto & packet : ele.second) + { + for (int i = 0; i < packet.chunks_size(); ++i) + { + auto decoded_block = CHBlockChunkCodec::decode(packet.chunks(i), header); + decoded_block_rows += decoded_block.rows(); + } + } + ASSERT_EQ(decoded_block_rows, per_part_rows); + } +} +CATCH + +TEST_F(TestMPPExchangeWriter, emptyBlockForHashPartitionWriter) +try +{ + const size_t batch_send_min_limit = 108; + const uint16_t part_num = 4; + const bool should_send_exec_summary_at_last = true; + + std::unordered_map write_report; + auto checker = [&write_report](mpp::MPPDataPacket & packet, uint16_t part_id) { + auto res = write_report.insert({part_id, packet}); + // Should always insert succeed. There is at most one packet per partition. + ASSERT_TRUE(res.second); + }; + auto mock_writer = std::make_shared(checker, part_num); + + auto dag_writer = std::make_shared>>( + mock_writer, + part_col_ids, + part_col_collators, + batch_send_min_limit, + should_send_exec_summary_at_last, + *dag_context_ptr); + dag_writer->finishWrite(); + + // For `should_send_exec_summary_at_last = true`, there is at least one packet used to pass execution summary. + ASSERT_EQ(write_report.size(), 1); + ASSERT_EQ(write_report.cbegin()->second.chunks_size(), 0); +} +CATCH + +TEST_F(TestMPPExchangeWriter, testBroadcastOrPassThroughWriter) +try +{ + const size_t block_rows = 64; + const size_t block_num = 64; + const size_t batch_send_min_limit = 108; + + const bool should_send_exec_summary_at_last = true; + + // 1. Build Blocks. + std::vector blocks; + for (size_t i = 0; i < block_num; ++i) + { + blocks.emplace_back(prepareRandomBlock(block_rows)); + blocks.emplace_back(prepareRandomBlock(0)); + } + Block header = blocks.back(); + + // 2. Build MockStreamWriter. + std::vector write_report; + auto checker = [&write_report](mpp::MPPDataPacket & packet, uint16_t part_id) { + ASSERT_EQ(part_id, 0); + write_report.emplace_back(std::move(packet)); + }; + auto mock_writer = std::make_shared(checker, 1); + + // 3. Start to write. + auto dag_writer = std::make_shared>>( + mock_writer, + batch_send_min_limit, + should_send_exec_summary_at_last, + *dag_context_ptr); + for (const auto & block : blocks) + dag_writer->write(block); + dag_writer->finishWrite(); + + // 4. Start to check write_report. + size_t expect_rows = block_rows * block_num; + size_t decoded_block_rows = 0; + for (const auto & packet : write_report) + { + for (int i = 0; i < packet.chunks_size(); ++i) + { + auto decoded_block = CHBlockChunkCodec::decode(packet.chunks(i), header); + decoded_block_rows += decoded_block.rows(); + } + } + ASSERT_EQ(decoded_block_rows, expect_rows); +} +CATCH + +TEST_F(TestMPPExchangeWriter, emptyBlockForBroadcastOrPassThroughWriter) +try +{ + const size_t batch_send_min_limit = 108; + const bool should_send_exec_summary_at_last = true; + + std::vector write_report; + auto checker = [&write_report](mpp::MPPDataPacket & packet, uint16_t part_id) { + ASSERT_EQ(part_id, 0); + write_report.emplace_back(std::move(packet)); + }; + auto mock_writer = std::make_shared(checker, 1); + + auto dag_writer = std::make_shared>>( + mock_writer, + batch_send_min_limit, + should_send_exec_summary_at_last, + *dag_context_ptr); + dag_writer->finishWrite(); + + // For `should_send_exec_summary_at_last = true`, there is at least one packet used to pass execution summary. + ASSERT_EQ(write_report.size(), 1); + ASSERT_EQ(write_report.back().chunks_size(), 0); +} +CATCH + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalExchangeSender.cpp b/dbms/src/Flash/Planner/plans/PhysicalExchangeSender.cpp index a67b7cdc131..4b9faee1707 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalExchangeSender.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalExchangeSender.cpp @@ -18,7 +18,7 @@ #include #include #include -#include +#include #include #include @@ -59,46 +59,31 @@ void PhysicalExchangeSender::transformImpl(DAGPipeline & pipeline, Context & con RUNTIME_ASSERT(dag_context.isMPPTask() && dag_context.tunnel_set != nullptr, log, "exchange_sender only run in MPP"); - int stream_id = 0; - + String extra_info; if (fine_grained_shuffle.enable()) { - pipeline.transform([&](auto & stream) { - // construct writer - std::unique_ptr response_writer = std::make_unique>( - dag_context.tunnel_set, - partition_col_ids, - partition_col_collators, - exchange_type, - context.getSettingsRef().dag_records_per_chunk, - context.getSettingsRef().batch_send_min_limit, - stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response - dag_context, - fine_grained_shuffle.stream_count, - fine_grained_shuffle.batch_size); - stream = std::make_shared(stream, std::move(response_writer), log->identifier()); - stream->setExtraInfo(String(enableFineGrainedShuffleExtraInfo)); - }); + extra_info = String(enableFineGrainedShuffleExtraInfo); RUNTIME_CHECK(exchange_type == tipb::ExchangeType::Hash, ExchangeType_Name(exchange_type)); RUNTIME_CHECK(fine_grained_shuffle.stream_count <= 1024, fine_grained_shuffle.stream_count); } - else - { - pipeline.transform([&](auto & stream) { - std::unique_ptr response_writer = std::make_unique>( - dag_context.tunnel_set, - partition_col_ids, - partition_col_collators, - exchange_type, - context.getSettingsRef().dag_records_per_chunk, - context.getSettingsRef().batch_send_min_limit, - stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response - dag_context, - fine_grained_shuffle.stream_count, - fine_grained_shuffle.batch_size); - stream = std::make_shared(stream, std::move(response_writer), log->identifier()); - }); - } + int stream_id = 0; + pipeline.transform([&](auto & stream) { + // construct writer + std::unique_ptr response_writer = newMPPExchangeWriter( + dag_context.tunnel_set, + partition_col_ids, + partition_col_collators, + exchange_type, + context.getSettingsRef().dag_records_per_chunk, + context.getSettingsRef().batch_send_min_limit, + stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response + dag_context, + fine_grained_shuffle.enable(), + fine_grained_shuffle.stream_count, + fine_grained_shuffle.batch_size); + stream = std::make_shared(stream, std::move(response_writer), log->identifier()); + stream->setExtraInfo(extra_info); + }); } void PhysicalExchangeSender::finalize(const Names & parent_require) diff --git a/dbms/src/Flash/tests/bench_exchange.cpp b/dbms/src/Flash/tests/bench_exchange.cpp index d6e3f3e825e..3454242f2a7 100644 --- a/dbms/src/Flash/tests/bench_exchange.cpp +++ b/dbms/src/Flash/tests/bench_exchange.cpp @@ -15,8 +15,9 @@ #include #include -#include // to include the implementation of StreamingDAGResponseWriter #include // to include the implementation of ExchangeReceiver +#include // to include the implementation of FineGrainedShuffleWriter +#include // to include the implementation of HashParitionWriter #include // to include the implementation of MPPTunnel #include // to include the implementation of MPPTunnelSet #include @@ -284,13 +285,10 @@ BlockInputStreamPtr SenderHelper::buildUnionStream( if (enableFineGrainedShuffle(fine_grained_shuffle_stream_count)) { std::unique_ptr response_writer( - new StreamingDAGResponseWriter( + new FineGrainedShuffleWriter( tunnel_set, {0, 1, 2}, TiDB::TiDBCollators(3), - tipb::Hash, - -1, - -1, true, *dag_context, fine_grained_shuffle_stream_count, @@ -300,17 +298,13 @@ BlockInputStreamPtr SenderHelper::buildUnionStream( else { std::unique_ptr response_writer( - new StreamingDAGResponseWriter( + new HashParitionWriter( tunnel_set, {0, 1, 2}, TiDB::TiDBCollators(3), - tipb::Hash, - -1, -1, true, - *dag_context, - fine_grained_shuffle_stream_count, - fine_grained_shuffle_batch_size)); + *dag_context)); send_streams.push_back(std::make_shared(stream, std::move(response_writer), /*req_id=*/"")); } } @@ -327,13 +321,10 @@ BlockInputStreamPtr SenderHelper::buildUnionStream(size_t total_rows, const std: if (enableFineGrainedShuffle(fine_grained_shuffle_stream_count)) { std::unique_ptr response_writer( - new StreamingDAGResponseWriter( + new FineGrainedShuffleWriter( tunnel_set, {0, 1, 2}, TiDB::TiDBCollators(3), - tipb::Hash, - -1, - -1, true, *dag_context, fine_grained_shuffle_stream_count, @@ -343,17 +334,13 @@ BlockInputStreamPtr SenderHelper::buildUnionStream(size_t total_rows, const std: else { std::unique_ptr response_writer( - new StreamingDAGResponseWriter( + new HashParitionWriter( tunnel_set, {0, 1, 2}, TiDB::TiDBCollators(3), - tipb::Hash, - -1, -1, true, - *dag_context, - fine_grained_shuffle_stream_count, - fine_grained_shuffle_batch_size)); + *dag_context)); send_streams.push_back(std::make_shared(stream, std::move(response_writer), /*req_id=*/"")); } }