From 38941c9d08ad8d0b8b95f8818caeb103d7f75f88 Mon Sep 17 00:00:00 2001 From: yibin Date: Fri, 21 Oct 2022 17:38:47 +0800 Subject: [PATCH 01/13] Implement decode in CHBlockChunkCodec class Signed-off-by: yibin --- .../DataStreams/NativeBlockInputStream.cpp | 31 +------ dbms/src/DataStreams/NativeBlockInputStream.h | 15 +--- .../Flash/Coprocessor/CHBlockChunkCodec.cpp | 87 +++++++++++++++++-- .../src/Flash/Coprocessor/CHBlockChunkCodec.h | 12 ++- dbms/src/Flash/Coprocessor/CodecUtils.cpp | 49 +++++++++++ dbms/src/Flash/Coprocessor/CodecUtils.h | 39 +++++++++ 6 files changed, 183 insertions(+), 50 deletions(-) create mode 100644 dbms/src/Flash/Coprocessor/CodecUtils.cpp create mode 100644 dbms/src/Flash/Coprocessor/CodecUtils.h diff --git a/dbms/src/DataStreams/NativeBlockInputStream.cpp b/dbms/src/DataStreams/NativeBlockInputStream.cpp index 17315a5598e..3c71a9cc29d 100644 --- a/dbms/src/DataStreams/NativeBlockInputStream.cpp +++ b/dbms/src/DataStreams/NativeBlockInputStream.cpp @@ -20,7 +20,6 @@ #include #include #include - #include @@ -31,32 +30,8 @@ namespace ErrorCodes extern const int INCORRECT_INDEX; extern const int LOGICAL_ERROR; extern const int CANNOT_READ_ALL_DATA; -extern const int NOT_IMPLEMENTED; } // namespace ErrorCodes -namespace -{ -void checkColumnSize(size_t expected, size_t actual) -{ - if (expected != actual) - throw Exception( - fmt::format("NativeBlockInputStream schema mismatch, expected {}, actual {}.", expected, actual), - ErrorCodes::LOGICAL_ERROR); -} - -void checkDataTypeName(size_t column_index, const String & expected, const String & actual) -{ - if (expected != actual) - throw Exception( - fmt::format( - "NativeBlockInputStream schema mismatch at column {}, expected {}, actual {}", - column_index, - expected, - actual), - ErrorCodes::LOGICAL_ERROR); -} -} // namespace - NativeBlockInputStream::NativeBlockInputStream( ReadBuffer & istr_, UInt64 server_revision_, @@ -179,9 +154,9 @@ Block NativeBlockInputStream::readImpl() } if (header) - checkColumnSize(header.columns(), columns); + CodecUtils::checkColumnSize(header.columns(), columns); else if (!output_names.empty()) - checkColumnSize(output_names.size(), columns); + CodecUtils::checkColumnSize(output_names.size(), columns); for (size_t i = 0; i < columns; ++i) { @@ -208,7 +183,7 @@ Block NativeBlockInputStream::readImpl() readBinary(type_name, istr); if (header) { - checkDataTypeName(i, header_datatypes[i].name, type_name); + CodecUtils::checkDataTypeName(i, header_datatypes[i].name, type_name); column.type = header_datatypes[i].type; } else diff --git a/dbms/src/DataStreams/NativeBlockInputStream.h b/dbms/src/DataStreams/NativeBlockInputStream.h index 6a3c2eedd76..52ff35b86ff 100644 --- a/dbms/src/DataStreams/NativeBlockInputStream.h +++ b/dbms/src/DataStreams/NativeBlockInputStream.h @@ -18,6 +18,7 @@ #include #include #include +#include namespace DB { @@ -116,19 +117,7 @@ class NativeBlockInputStream : public IProfilingBlockInputStream Block header; UInt64 server_revision; bool align_column_name_with_header = false; - - struct DataTypeWithTypeName - { - DataTypeWithTypeName(const DataTypePtr & t, const String & n) - : type(t) - , name(n) - { - } - - DataTypePtr type; - String name; - }; - std::vector header_datatypes; + std::vector header_datatypes; bool use_index = false; IndexForNativeFormat::Blocks::const_iterator index_block_it; diff --git a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp index 8f3d6e10d67..430af0f51c5 100644 --- a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp +++ b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp @@ -18,6 +18,7 @@ #include #include #include +#include namespace DB { @@ -48,6 +49,20 @@ class CHBlockChunkCodecStream : public ChunkCodecStream DataTypes expected_types; }; +CHBlockChunkCodec::CHBlockChunkCodec( + const Block & header_) + : header(header_) +{ + for (const auto & column : header) + header_datatypes.emplace_back(column.type, column.type->getName()); +} + +CHBlockChunkCodec::CHBlockChunkCodec(const DAGSchema & schema) +{ + for (const auto & c : schema) + output_names.push_back(c.first); +} + size_t getExtraInfoSize(const Block & block) { size_t size = 64; /// to hold some length of structures, such as column number, row number... @@ -83,6 +98,14 @@ void writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & o type.serializeBinaryBulkWithMultipleStreams(*full_column, output_stream_getter, offset, limit, false, {}); } +void readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows) +{ + IDataType::InputStreamGetter input_stream_getter = [&](const IDataType::SubstreamPath &) { + return &istr; + }; + type.deserializeBinaryBulkWithMultipleStreams(column, input_stream_getter, rows, 0, false, {}); +} + void CHBlockChunkCodecStream::encode(const Block & block, size_t start, size_t end) { /// only check block schema in CHBlock codec because for both @@ -120,21 +143,69 @@ std::unique_ptr CHBlockChunkCodec::newCodecStream(const std::v return std::make_unique(field_types); } +Block CHBlockChunkCodec::decodeImpl(ReadBuffer & istr) +{ + Block res; + if (istr.eof()) + { + return res; + } + + /// Dimensions + size_t columns = 0; + size_t rows = 0; + readVarUInt(columns, istr); + readVarUInt(rows, istr); + + if (header) + CodecUtils::checkColumnSize(header.columns(), columns); + else if (!output_names.empty()) + CodecUtils::checkColumnSize(output_names.size(), columns); + + for (size_t i = 0; i < columns; ++i) + { + ColumnWithTypeAndName column; + /// Name + readBinary(column.name, istr); + if (header) + column.name = header.getByPosition(i).name; + else if (!output_names.empty()) + column.name = output_names[i]; + + /// Type + String type_name; + readBinary(type_name, istr); + const DataTypeFactory & data_type_factory = DataTypeFactory::instance(); + if (header) + { + CodecUtils::checkDataTypeName(i, header_datatypes[i].name, type_name); + column.type = header_datatypes[i].type; + } + else + { + column.type = data_type_factory.get(type_name); + } + + /// Data + MutableColumnPtr read_column = column.type->createColumn(); + if (rows) /// If no rows, nothing to read. + readData(*column.type, *read_column, istr, rows); + + column.column = std::move(read_column); + res.insert(std::move(column)); + } + return res; +} + Block CHBlockChunkCodec::decode(const String & str, const DAGSchema & schema) { ReadBufferFromString read_buffer(str); - std::vector output_names; - for (const auto & c : schema) - output_names.push_back(c.first); - NativeBlockInputStream block_in(read_buffer, 0, std::move(output_names)); - return block_in.read(); + return CHBlockChunkCodec(schema).decodeImpl(read_buffer); } Block CHBlockChunkCodec::decode(const String & str, const Block & header) { ReadBufferFromString read_buffer(str); - NativeBlockInputStream block_in(read_buffer, header, 0, /*align_column_name_with_header=*/true); - return block_in.read(); + return CHBlockChunkCodec(header).decodeImpl(read_buffer); } - } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h index 79dc797cb47..9c8219b4a52 100644 --- a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h +++ b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h @@ -15,16 +15,26 @@ #pragma once #include +#include namespace DB { -class CHBlockChunkCodec : public ChunkCodec +class CHBlockChunkCodec final : public ChunkCodec { public: CHBlockChunkCodec() = default; + CHBlockChunkCodec(const Block & header_); + CHBlockChunkCodec(const DAGSchema & schema); + Block decode(const String &, const DAGSchema & schema) override; static Block decode(const String &, const Block & header); std::unique_ptr newCodecStream(const std::vector & field_types) override; + +private: + Block decodeImpl(ReadBuffer & istr); + Block header; + std::vector header_datatypes; + std::vector output_names; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/CodecUtils.cpp b/dbms/src/Flash/Coprocessor/CodecUtils.cpp new file mode 100644 index 00000000000..174e513ed4a --- /dev/null +++ b/dbms/src/Flash/Coprocessor/CodecUtils.cpp @@ -0,0 +1,49 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ +extern const int LOGICAL_ERROR; +} // namespace ErrorCodes + +namespace CodecUtils +{ +void checkColumnSize(size_t expected, size_t actual) +{ + if (expected != actual) + throw Exception( + fmt::format("NativeBlockInputStream schema mismatch, expected {}, actual {}.", expected, actual), + ErrorCodes::LOGICAL_ERROR); +} + +void checkDataTypeName(size_t column_index, const String & expected, const String & actual) +{ + if (expected != actual) + throw Exception( + fmt::format( + "NativeBlockInputStream schema mismatch at column {}, expected {}, actual {}", + column_index, + expected, + actual), + ErrorCodes::LOGICAL_ERROR); +} + +} // namespace CodecUtils +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/CodecUtils.h b/dbms/src/Flash/Coprocessor/CodecUtils.h new file mode 100644 index 00000000000..eb21ef9b8b4 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/CodecUtils.h @@ -0,0 +1,39 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ +namespace CodecUtils +{ +struct DataTypeWithTypeName +{ + DataTypeWithTypeName(const DataTypePtr & t, const String & n) + : type(t) + , name(n) + { + } + + DataTypePtr type; + String name; +}; + +void checkColumnSize(size_t expected, size_t actual); +void checkDataTypeName(size_t column_index, const String & expected, const String & actual); + +} // namespace CodecUtils +} // namespace DB \ No newline at end of file From 56b8e9e78175a54ff0d3473ad243736c85425ce6 Mon Sep 17 00:00:00 2001 From: yibin Date: Wed, 26 Oct 2022 16:01:55 +0800 Subject: [PATCH 02/13] Opt TiRemoteBlockInputStream decode logic with squashing Signed-off-by: yibin --- .../DataStreams/TiRemoteBlockInputStream.h | 87 +++++----- .../Flash/Coprocessor/CHBlockChunkCodec.cpp | 70 ++++---- .../src/Flash/Coprocessor/CHBlockChunkCodec.h | 10 +- dbms/src/Flash/Coprocessor/CodecUtils.h | 1 - .../src/Flash/Coprocessor/CoprocessorReader.h | 42 +++-- dbms/src/Flash/Coprocessor/DecodeDetail.h | 4 + .../Coprocessor/IChunkDecodeAndSquash.cpp | 78 +++++++++ .../Flash/Coprocessor/IChunkDecodeAndSquash.h | 47 ++++++ .../tests/gtest_chunk_decode_and_squash.cpp | 156 ++++++++++++++++++ dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 88 +++++++--- dbms/src/Flash/Mpp/ExchangeReceiver.h | 16 +- 11 files changed, 491 insertions(+), 108 deletions(-) create mode 100644 dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.cpp create mode 100644 dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.h create mode 100644 dbms/src/Flash/Coprocessor/tests/gtest_chunk_decode_and_squash.cpp diff --git a/dbms/src/DataStreams/TiRemoteBlockInputStream.h b/dbms/src/DataStreams/TiRemoteBlockInputStream.h index c128c4d260e..44a42c8f4b0 100644 --- a/dbms/src/DataStreams/TiRemoteBlockInputStream.h +++ b/dbms/src/DataStreams/TiRemoteBlockInputStream.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -64,6 +65,8 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream // CoprocessorBlockInputStream doesn't take care of this. size_t stream_id; + std::unique_ptr decoder_ptr; + void initRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index) { for (const auto & execution_summary : resp.execution_summaries()) @@ -84,6 +87,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream { if (resp.execution_summaries_size() == 0) return; + if (!execution_summaries_inited[index].load()) { initRemoteExecutionSummaries(resp, index); @@ -128,51 +132,54 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream { while (true) { - auto result = remote_reader->nextResult(block_queue, sample_block, stream_id); - if (result.meet_error) - { - LOG_WARNING(log, "remote reader meets error: {}", result.error_msg); - throw Exception(result.error_msg); - } - if (result.eof) - return false; - if (result.resp != nullptr && result.resp->has_error()) + auto results = remote_reader->nextResult(block_queue, sample_block, stream_id, decoder_ptr); + for (auto & result : results) { - LOG_WARNING(log, "remote reader meets error: {}", result.resp->error().DebugString()); - throw Exception(result.resp->error().DebugString()); - } - /// only the last response contains execution summaries - if (result.resp != nullptr) - { - if constexpr (is_streaming_reader) + if (result.meet_error) { - addRemoteExecutionSummaries(*result.resp, result.call_index, true); + LOG_WARNING(log, "remote reader meets error: {}", result.error_msg); + throw Exception(result.error_msg); } - else + if (result.eof) + return false; + if (result.resp != nullptr && result.resp->has_error()) { - addRemoteExecutionSummaries(*result.resp, 0, false); + LOG_WARNING(log, "remote reader meets error: {}", result.resp->error().DebugString()); + throw Exception(result.resp->error().DebugString()); } - } + /// only the last response contains execution summaries + if (result.resp != nullptr) + { + if constexpr (is_streaming_reader) + { + addRemoteExecutionSummaries(*result.resp, result.call_index, true); + } + else + { + addRemoteExecutionSummaries(*result.resp, 0, false); + } + } + + const auto & decode_detail = result.decode_detail; - const auto & decode_detail = result.decode_detail; - - size_t index = 0; - if constexpr (is_streaming_reader) - index = result.call_index; - - ++connection_profile_infos[index].packets; - connection_profile_infos[index].bytes += decode_detail.packet_bytes; - - total_rows += decode_detail.rows; - LOG_TRACE( - log, - "recv {} rows from remote for {}, total recv row num: {}", - decode_detail.rows, - result.req_info, - total_rows); - if (decode_detail.rows > 0) - return true; - // else continue + size_t index = 0; + if constexpr (is_streaming_reader) + index = result.call_index; + + ++connection_profile_infos[index].packets; + connection_profile_infos[index].bytes += decode_detail.packet_bytes; + + total_rows += decode_detail.rows; + LOG_TRACE( + log, + "recv {} rows from remote for {}, total recv row num: {}", + decode_detail.rows, + result.req_info, + total_rows); + if (decode_detail.rows > 0) + return true; + // else continue + } } } @@ -193,6 +200,8 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream execution_summaries.resize(source_num); connection_profile_infos.resize(source_num); sample_block = Block(getColumnWithTypeAndName(toNamesAndTypes(remote_reader->getOutputSchema()))); + if constexpr (is_streaming_reader) + decoder_ptr = std::make_unique(sample_block, 8192); } Block getHeader() const override { return sample_block; } diff --git a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp index 430af0f51c5..46e976adafd 100644 --- a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp +++ b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp @@ -98,7 +98,7 @@ void writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & o type.serializeBinaryBulkWithMultipleStreams(*full_column, output_stream_getter, offset, limit, false, {}); } -void readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows) +void CHBlockChunkCodec::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows) { IDataType::InputStreamGetter input_stream_getter = [&](const IDataType::SubstreamPath &) { return &istr; @@ -143,7 +143,7 @@ std::unique_ptr CHBlockChunkCodec::newCodecStream(const std::v return std::make_unique(field_types); } -Block CHBlockChunkCodec::decodeImpl(ReadBuffer & istr) +Block CHBlockChunkCodec::decodeImpl(ReadBuffer & istr, size_t reserve_size) { Block res; if (istr.eof()) @@ -154,40 +154,18 @@ Block CHBlockChunkCodec::decodeImpl(ReadBuffer & istr) /// Dimensions size_t columns = 0; size_t rows = 0; - readVarUInt(columns, istr); - readVarUInt(rows, istr); - - if (header) - CodecUtils::checkColumnSize(header.columns(), columns); - else if (!output_names.empty()) - CodecUtils::checkColumnSize(output_names.size(), columns); + readBlockMeta(istr, columns, rows); for (size_t i = 0; i < columns; ++i) { ColumnWithTypeAndName column; - /// Name - readBinary(column.name, istr); - if (header) - column.name = header.getByPosition(i).name; - else if (!output_names.empty()) - column.name = output_names[i]; - - /// Type - String type_name; - readBinary(type_name, istr); - const DataTypeFactory & data_type_factory = DataTypeFactory::instance(); - if (header) - { - CodecUtils::checkDataTypeName(i, header_datatypes[i].name, type_name); - column.type = header_datatypes[i].type; - } - else - { - column.type = data_type_factory.get(type_name); - } + readColumnMeta(i, istr, column); /// Data MutableColumnPtr read_column = column.type->createColumn(); + if (reserve_size > 0) + read_column->reserve(reserve_size); + if (rows) /// If no rows, nothing to read. readData(*column.type, *read_column, istr, rows); @@ -196,6 +174,40 @@ Block CHBlockChunkCodec::decodeImpl(ReadBuffer & istr) } return res; } +void CHBlockChunkCodec::readBlockMeta(ReadBuffer & istr, size_t & columns, size_t & rows) const +{ + readVarUInt(columns, istr); + readVarUInt(rows, istr); + + if (header) + CodecUtils::checkColumnSize(header.columns(), columns); + else if (!output_names.empty()) + CodecUtils::checkColumnSize(output_names.size(), columns); +} + +void CHBlockChunkCodec::readColumnMeta(size_t i, ReadBuffer & istr, ColumnWithTypeAndName & column) +{ + /// Name + readBinary(column.name, istr); + if (header) + column.name = header.getByPosition(i).name; + else if (!output_names.empty()) + column.name = output_names[i]; + + /// Type + String type_name; + readBinary(type_name, istr); + const DataTypeFactory & data_type_factory = DataTypeFactory::instance(); + if (header) + { + CodecUtils::checkDataTypeName(i, header_datatypes[i].name, type_name); + column.type = header_datatypes[i].type; + } + else + { + column.type = data_type_factory.get(type_name); + } +} Block CHBlockChunkCodec::decode(const String & str, const DAGSchema & schema) { diff --git a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h index 9c8219b4a52..2fa520e19a9 100644 --- a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h +++ b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h @@ -19,6 +19,8 @@ namespace DB { +class CHBlockChunkDecodeAndSquash; + class CHBlockChunkCodec final : public ChunkCodec { public: @@ -31,7 +33,13 @@ class CHBlockChunkCodec final : public ChunkCodec std::unique_ptr newCodecStream(const std::vector & field_types) override; private: - Block decodeImpl(ReadBuffer & istr); + friend class CHBlockChunkDecodeAndSquash; + void readColumnMeta(size_t i, ReadBuffer & istr, ColumnWithTypeAndName & column); + void readBlockMeta(ReadBuffer & istr, size_t & columns, size_t & rows) const; + static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows); + /// 'reserve_size' used for Squash usage, and takes effect when 'reserve_size' > 0 + Block decodeImpl(ReadBuffer & istr, size_t reserve_size = 0); + Block header; std::vector header_datatypes; std::vector output_names; diff --git a/dbms/src/Flash/Coprocessor/CodecUtils.h b/dbms/src/Flash/Coprocessor/CodecUtils.h index eb21ef9b8b4..c668c622fda 100644 --- a/dbms/src/Flash/Coprocessor/CodecUtils.h +++ b/dbms/src/Flash/Coprocessor/CodecUtils.h @@ -34,6 +34,5 @@ struct DataTypeWithTypeName void checkColumnSize(size_t expected, size_t actual); void checkDataTypeName(size_t column_index, const String & expected, const String & actual); - } // namespace CodecUtils } // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Coprocessor/CoprocessorReader.h b/dbms/src/Flash/Coprocessor/CoprocessorReader.h index b48fdbcd6dc..73dde5f7e0a 100644 --- a/dbms/src/Flash/Coprocessor/CoprocessorReader.h +++ b/dbms/src/Flash/Coprocessor/CoprocessorReader.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -139,35 +140,50 @@ class CoprocessorReader return detail; } - // stream_id is only meaningful for ExchagneReceiver. - CoprocessorReaderResult nextResult(std::queue & block_queue, const Block & header, size_t /*stream_id*/) + // stream_id, decoder_ptr are only meaningful for ExchagneReceiver. + std::vector nextResult(std::queue & block_queue, const Block & header, size_t /*stream_id*/, std::unique_ptr & /*decoder_ptr*/) { + std::vector results; auto && [result, has_next] = resp_iter.next(); if (!result.error.empty()) - return {nullptr, true, result.error.message(), false}; + { + results.push_back({nullptr, true, result.error.message(), false}); + return results; + } + if (!has_next) - return {nullptr, false, "", true}; + { + results.push_back({nullptr, false, "", true}); + return results; + } auto resp = std::make_shared(); if (resp->ParseFromString(result.data())) { if (resp->has_error()) { - return {nullptr, true, resp->error().DebugString(), false}; + results.push_back({nullptr, true, resp->error().DebugString(), false}); + return results; } else if (has_enforce_encode_type && resp->encode_type() != tipb::EncodeType::TypeCHBlock && resp->chunks_size() > 0) - return { - nullptr, - true, - "Encode type of coprocessor response is not CHBlock, " - "maybe the version of some TiFlash node in the cluster is not match with this one", - false}; + { + results.push_back( + { + nullptr, + true, + "Encode type of coprocessor response is not CHBlock, " + "maybe the version of some TiFlash node in the cluster is not match with this one", + false}); + return results; + } auto detail = decodeChunks(resp, block_queue, header, schema); - return {resp, false, "", false, detail}; + results.push_back({resp, false, "", false, detail}); + return results; } else { - return {nullptr, true, "Error while decoding coprocessor::Response", false}; + results.push_back({nullptr, true, "Error while decoding coprocessor::Response", false}); + return results; } } diff --git a/dbms/src/Flash/Coprocessor/DecodeDetail.h b/dbms/src/Flash/Coprocessor/DecodeDetail.h index 91851650d9e..081854c4473 100644 --- a/dbms/src/Flash/Coprocessor/DecodeDetail.h +++ b/dbms/src/Flash/Coprocessor/DecodeDetail.h @@ -28,5 +28,9 @@ struct DecodeDetail // Total byte size of the origin packet, even for fine grained shuffle. Int64 packet_bytes = 0; + + // For decodeAndSquash usage, several packets might be squashed to produce one block. + // This flag is true only when: during the decode process of this packet, new block is produced. + bool produce_block_flag = true; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.cpp b/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.cpp new file mode 100644 index 00000000000..78d796cf4a1 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.cpp @@ -0,0 +1,78 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB +{ + +CHBlockChunkDecodeAndSquash::CHBlockChunkDecodeAndSquash( + const Block & header, + size_t rows_limit_) + : codec(header) + , rows_limit(rows_limit_) +{ +} + +std::optional CHBlockChunkDecodeAndSquash::decodeAndSquash(const String & str) +{ + std::optional res; + ReadBufferFromString istr(str); + if (istr.eof()) + { + if (accumulated_block) + res.swap(accumulated_block); + return res; + } + + if (!accumulated_block) { + /// hard-code 1.5 here, since final column size will be more than rows_limit in most situations, + /// so it should be larger than 1.0, just use 1.5 here, no special meaning + Block block = codec.decodeImpl(istr, static_cast(rows_limit * 1.5)); + if (block) + accumulated_block.emplace(std::move(block)); + } + else + { + /// Dimensions + size_t columns = 0; + size_t rows = 0; + codec.readBlockMeta(istr, columns, rows); + + auto mutable_columns = accumulated_block->mutateColumns(); + for (size_t i = 0; i < columns; ++i) + { + ColumnWithTypeAndName column; + codec.readColumnMeta(i, istr, column); + + /// Data + MutableColumnPtr read_column = column.type->createColumn(); + if (rows) /// If no rows, nothing to read. + CHBlockChunkCodec::readData(*column.type, *(mutable_columns[i]), istr, rows); + } + accumulated_block->setColumns(std::move(mutable_columns)); + } + + RUNTIME_ASSERT(accumulated_block); + if (accumulated_block->rows() >= rows_limit) + { + /// Return accumulated data and reset accumulated_block + res.swap(accumulated_block); + return res; + } + return res; +} + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.h b/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.h new file mode 100644 index 00000000000..6c20cdb58ca --- /dev/null +++ b/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.h @@ -0,0 +1,47 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace DB +{ +class IChunkDecodeAndSquash +{ +public: + virtual ~IChunkDecodeAndSquash() = default; + /// The returned optional value can only have block that block.operator bool() is true + virtual std::optional decodeAndSquash(const String &) = 0; + /// The returned optional value can only have block that block.operator bool() is true + virtual std::optional flush() = 0; +}; + +class CHBlockChunkDecodeAndSquash final : public IChunkDecodeAndSquash +{ +public: + explicit CHBlockChunkDecodeAndSquash(const Block & header, size_t rows_limit_); + virtual ~CHBlockChunkDecodeAndSquash() = default; + std::optional decodeAndSquash(const String &); + std::optional flush() { return accumulated_block; } +private: + CHBlockChunkCodec codec; + std::optional accumulated_block; + size_t rows_limit; +}; + + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_chunk_decode_and_squash.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_chunk_decode_and_squash.cpp new file mode 100644 index 00000000000..76c4c04d5b5 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/tests/gtest_chunk_decode_and_squash.cpp @@ -0,0 +1,156 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +namespace DB +{ +namespace tests +{ +class TestChunkDecodeAndSquash : public testing::Test +{ +protected: + void SetUp() override + { + } + +public: + TestChunkDecodeAndSquash() + : context(TiFlashTestEnv::getContext()) + {} + + // Return 10 Int64 column. + static std::vector makeFields() + { + std::vector fields(10); + for (int i = 0; i < 10; ++i) + { + fields[i].set_tp(TiDB::TypeLongLong); + fields[i].set_flag(TiDB::ColumnFlagNotNull); + } + return fields; + } + + static DAGSchema makeSchema() + { + auto fields = makeFields(); + DAGSchema schema; + for (size_t i = 0; i < fields.size(); ++i) + { + ColumnInfo info = TiDB::fieldTypeToColumnInfo(fields[i]); + schema.emplace_back(String("col") + std::to_string(i), std::move(info)); + } + return schema; + } + + // Return a block with **rows** and 10 Int64 column. + static Block prepareBlock(size_t rows) + { + Block block; + for (size_t i = 0; i < 10; ++i) + { + DataTypePtr int64_data_type = std::make_shared(); + auto int64_column = ColumnGenerator::instance().generate({rows, "Int64", RANDOM}).column; + block.insert(ColumnWithTypeAndName{ + std::move(int64_column), + int64_data_type, + String("col") + std::to_string(i)}); + } + return block; + } + + void doTestWork(bool flush_somthing) + { + const size_t block_rows = 1024; + const size_t block_num = 256; + std::mt19937_64 rand_gen; + // 1. Build Blocks. + std::vector blocks; + for (size_t i = 0; i < block_num; ++i) + { + UInt64 rows = flush_somthing ? static_cast(rand_gen()) % (block_rows * 4) : block_rows; + blocks.emplace_back(prepareBlock(rows)); + if (flush_somthing) + blocks.emplace_back(prepareBlock(0)); /// Adds this empty block, so even unluckily, total_rows % rows_limit == 0, it would flush an empty block with header + } + + // 2. encode all blocks + std::unique_ptr codec_stream = std::make_unique()->newCodecStream(makeFields()); + std::vector encode_str_vec(block_num); + for (const auto & block : blocks) + { + codec_stream->encode(block, 0, block.rows()); + encode_str_vec.push_back(codec_stream->getString()); + codec_stream->clear(); + } + + // 3. DecodeAndSquash all these blocks + Block header = blocks.back(); + std::vector decoded_blocks; + CHBlockChunkDecodeAndSquash decoder(header, block_rows * 4); + for (const auto & str : encode_str_vec) + { + auto result = decoder.decodeAndSquash(str); + if (result) + decoded_blocks.push_back(std::move(result.value())); + } + auto last_block = decoder.flush(); + if (last_block) + decoded_blocks.push_back(std::move(last_block.value())); + + // 4. Check correctness + std::vector reference_block_vec; + SquashingTransform squash_transform(block_rows * 4, 0, ""); + for (auto & block : blocks) + { + auto result = squash_transform.add(std::move(block)); + if (result.ready) + reference_block_vec.push_back(result.block); + } + Block empty; + auto last_result = squash_transform.add(std::move(empty)); + if (last_result.ready) + reference_block_vec.push_back(last_result.block); + + if (flush_somthing) + ASSERT_EQ(decoded_blocks.size(), reference_block_vec.size()); + else + ASSERT_EQ(decoded_blocks.size() + 1, reference_block_vec.size()); /// reference_block has an empty block at last + for (size_t i = 0; i < decoded_blocks.size(); ++i) + for (size_t j = 0; j < header.columns(); ++j) + ASSERT_COLUMN_EQ(reference_block_vec[i].getByPosition(j), decoded_blocks[i].getByPosition(j)); + } + Context context; +}; + +TEST_F(TestChunkDecodeAndSquash, testDecodeAndSquash) +try +{ + doTestWork(true); + doTestWork(false); +} +CATCH + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 521e9549cb5..2eaa91c4893 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "ExchangeReceiver.h" + #include #include #include @@ -681,7 +683,7 @@ template DecodeDetail ExchangeReceiverBase::decodeChunks( const std::shared_ptr & recv_msg, std::queue & block_queue, - const Block & header) + std::unique_ptr & decoder_ptr) { assert(recv_msg != nullptr); DecodeDetail detail; @@ -692,40 +694,83 @@ DecodeDetail ExchangeReceiverBase::decodeChunks( // Record total packet size even if fine grained shuffle is enabled. detail.packet_bytes = packet.ByteSizeLong(); - + detail.produce_block_flag = false; for (const String * chunk : recv_msg->chunks) { - Block block = CHBlockChunkCodec::decode(*chunk, header); - detail.rows += block.rows(); - if (unlikely(block.rows() == 0)) + auto result = decoder_ptr->decodeAndSquash(*chunk); + if (!result) continue; - block_queue.push(std::move(block)); + detail.rows += result->rows(); + detail.produce_block_flag = true; + if likely(result->rows() > 0) + block_queue.push(std::move(result.value())); } return detail; } template -ExchangeReceiverResult ExchangeReceiverBase::nextResult(std::queue & block_queue, const Block & header, size_t stream_id) +std::vector ExchangeReceiverBase::nextResult( + std::queue & block_queue, + const Block & header, + size_t stream_id, + std::unique_ptr & decoder_ptr) { + std::vector results; if (unlikely(stream_id >= msg_channels.size())) { LOG_ERROR(exc_log, "stream_id out of range, stream_id: {}, total_stream_count: {}", stream_id, msg_channels.size()); - return ExchangeReceiverResult::newError(0, "", "stream_id out of range"); + results.push_back(ExchangeReceiverResult::newError(0, "", "stream_id out of range")); + return results; } - std::shared_ptr recv_msg; - if (msg_channels[stream_id]->pop(recv_msg) != MPMCQueueResult::OK) + + while (true) { - std::unique_lock lock(mu); - return state != ExchangeReceiverState::NORMAL - ? ExchangeReceiverResult::newError(0, name, constructStatusString(state, err_msg)) - : ExchangeReceiverResult::newEOF(name); /// live_connections == 0, msg_channel is finished, and state is NORMAL, that is the end. + std::shared_ptr recv_msg; + if (msg_channels[stream_id]->pop(recv_msg) != MPMCQueueResult::OK) + { + handleUnnormalChannel(block_queue, results, decoder_ptr); + return results; + } + else + { + assert(recv_msg != nullptr); + if (unlikely(recv_msg->error_ptr != nullptr)) + { + results.push_back(ExchangeReceiverResult::newError(recv_msg->source_index, recv_msg->req_info, recv_msg->error_ptr->msg())); + return results; + } + const ExchangeReceiverResult & result = toDecodeResult(block_queue, header, recv_msg, decoder_ptr); + if (unlikely(result.meet_error) || result.decode_detail.produce_block_flag) + { + results.push_back(result); + return results; + } + } + } +} + +template +void ExchangeReceiverBase::handleUnnormalChannel( + std::queue & block_queue, + std::vector & results, + std::unique_ptr & decoder_ptr) +{ + std::optional last_block = decoder_ptr->flush(); + std::unique_lock lock(mu); + if (this->state != DB::ExchangeReceiverState::NORMAL) + { + results.push_back(DB::ExchangeReceiverResult::newError(0, DB::ExchangeReceiverBase::name, DB::constructStatusString(this->state, this->err_msg))); } else { - assert(recv_msg != nullptr); - if (unlikely(recv_msg->error_ptr != nullptr)) - return ExchangeReceiverResult::newError(recv_msg->source_index, recv_msg->req_info, recv_msg->error_ptr->msg()); - return toDecodeResult(block_queue, header, recv_msg); + if (last_block && last_block->rows() > 0) + { + block_queue.push(std::move(last_block.value())); + } + else + { + results.push_back(DB::ExchangeReceiverResult::newEOF(DB::ExchangeReceiverBase::name)); /// live_connections == 0, msg_channel is finished, and state is NORMAL, that is the end. + } } } @@ -733,7 +778,8 @@ template ExchangeReceiverResult ExchangeReceiverBase::toDecodeResult( std::queue & block_queue, const Block & header, - const std::shared_ptr & recv_msg) + const std::shared_ptr & recv_msg, + std::unique_ptr & decoder_ptr) { assert(recv_msg != nullptr); if (recv_msg->resp_ptr != nullptr) /// the data of the last packet is serialized from tipb::SelectResponse including execution summaries. @@ -757,7 +803,7 @@ ExchangeReceiverResult ExchangeReceiverBase::toDecodeResult( } else if (!recv_msg->chunks.empty()) { - result.decode_detail = decodeChunks(recv_msg, block_queue, header); + result.decode_detail = decodeChunks(recv_msg, block_queue, decoder_ptr); } return result; } @@ -765,7 +811,7 @@ ExchangeReceiverResult ExchangeReceiverBase::toDecodeResult( else /// the non-last packets { auto result = ExchangeReceiverResult::newOk(nullptr, recv_msg->source_index, recv_msg->req_info); - result.decode_detail = decodeChunks(recv_msg, block_queue, header); + result.decode_detail = decodeChunks(recv_msg, block_queue, decoder_ptr); return result; } } diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.h b/dbms/src/Flash/Mpp/ExchangeReceiver.h index b1c4caa0a80..15e203e19dd 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.h +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -139,10 +140,11 @@ class ExchangeReceiverBase const DAGSchema & getOutputSchema() const { return schema; } - ExchangeReceiverResult nextResult( + std::vector nextResult( std::queue & block_queue, const Block & header, - size_t stream_id); + size_t stream_id, + std::unique_ptr & decoder_ptr); size_t getSourceNum() const { return source_num; } uint64_t getFineGrainedShuffleStreamCount() const { return fine_grained_shuffle_stream_count; } @@ -177,10 +179,15 @@ class ExchangeReceiverBase bool setEndState(ExchangeReceiverState new_state); String getStatusString(); + void handleUnnormalChannel( + std::queue & block_queue, + std::vector & results, + std::unique_ptr & decoder_ptr); + DecodeDetail decodeChunks( const std::shared_ptr & recv_msg, std::queue & block_queue, - const Block & header); + std::unique_ptr & decoder_ptr); void connectionDone( bool meet_error, @@ -193,7 +200,8 @@ class ExchangeReceiverBase ExchangeReceiverResult toDecodeResult( std::queue & block_queue, const Block & header, - const std::shared_ptr & recv_msg); + const std::shared_ptr & recv_msg, + std::unique_ptr & decoder_ptr); private: std::shared_ptr rpc_context; From 7aa9f5b389d40ed274617671a5473b7fee74947a Mon Sep 17 00:00:00 2001 From: yibin Date: Fri, 28 Oct 2022 09:23:17 +0800 Subject: [PATCH 03/13] Add TiRemoteBlockInputstream gtest Signed-off-by: yibin --- .../DataStreams/TiRemoteBlockInputStream.h | 1 + .../Coprocessor/IChunkDecodeAndSquash.cpp | 9 + .../Flash/Coprocessor/IChunkDecodeAndSquash.h | 4 +- .../tests/gtest_chunk_decode_and_squash.cpp | 42 +- .../gtest_ti_remote_block_inputstream.cpp | 448 ++++++++++++++++++ dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 14 +- 6 files changed, 487 insertions(+), 31 deletions(-) create mode 100644 dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp diff --git a/dbms/src/DataStreams/TiRemoteBlockInputStream.h b/dbms/src/DataStreams/TiRemoteBlockInputStream.h index 44a42c8f4b0..8ce6e43001a 100644 --- a/dbms/src/DataStreams/TiRemoteBlockInputStream.h +++ b/dbms/src/DataStreams/TiRemoteBlockInputStream.h @@ -231,6 +231,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream return execution_summaries_inited[index].load() ? &execution_summaries[index] : nullptr; } + size_t getTotalRows() const { return total_rows; } size_t getSourceNum() const { return source_num; } bool isStreamingCall() const { return is_streaming_reader; } const std::vector & getConnectionProfileInfos() const { return connection_profile_infos; } diff --git a/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.cpp b/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.cpp index 78d796cf4a1..2ec2b7285f3 100644 --- a/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.cpp +++ b/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.cpp @@ -75,4 +75,13 @@ std::optional CHBlockChunkDecodeAndSquash::decodeAndSquash(const String & return res; } +std::optional CHBlockChunkDecodeAndSquash::flush() +{ + if (!accumulated_block) + return accumulated_block; + std::optional res; + accumulated_block.swap(res); + return res; +} + } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.h b/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.h index 6c20cdb58ca..eb9f6077ae3 100644 --- a/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.h +++ b/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.h @@ -26,7 +26,7 @@ class IChunkDecodeAndSquash virtual ~IChunkDecodeAndSquash() = default; /// The returned optional value can only have block that block.operator bool() is true virtual std::optional decodeAndSquash(const String &) = 0; - /// The returned optional value can only have block that block.operator bool() is true + /// Return value should be false if 'flush' is invoked consecutively more than once virtual std::optional flush() = 0; }; @@ -36,7 +36,7 @@ class CHBlockChunkDecodeAndSquash final : public IChunkDecodeAndSquash explicit CHBlockChunkDecodeAndSquash(const Block & header, size_t rows_limit_); virtual ~CHBlockChunkDecodeAndSquash() = default; std::optional decodeAndSquash(const String &); - std::optional flush() { return accumulated_block; } + std::optional flush(); private: CHBlockChunkCodec codec; std::optional accumulated_block; diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_chunk_decode_and_squash.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_chunk_decode_and_squash.cpp index 76c4c04d5b5..262c973d43d 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_chunk_decode_and_squash.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_chunk_decode_and_squash.cpp @@ -40,6 +40,17 @@ class TestChunkDecodeAndSquash : public testing::Test : context(TiFlashTestEnv::getContext()) {} + static Block squashBlocks(std::vector & blocks) + { + std::vector reference_block_vec; + SquashingTransform squash_transform(std::numeric_limits::max(), 0, ""); + for (auto & block : blocks) + squash_transform.add(std::move(block)); + Block empty; + auto result = squash_transform.add(std::move(empty)); + return result.block; + } + // Return 10 Int64 column. static std::vector makeFields() { @@ -80,7 +91,7 @@ class TestChunkDecodeAndSquash : public testing::Test return block; } - void doTestWork(bool flush_somthing) + void doTestWork(bool flush_something) { const size_t block_rows = 1024; const size_t block_num = 256; @@ -89,9 +100,9 @@ class TestChunkDecodeAndSquash : public testing::Test std::vector blocks; for (size_t i = 0; i < block_num; ++i) { - UInt64 rows = flush_somthing ? static_cast(rand_gen()) % (block_rows * 4) : block_rows; + UInt64 rows = flush_something ? static_cast(rand_gen()) % (block_rows * 4) : block_rows; blocks.emplace_back(prepareBlock(rows)); - if (flush_somthing) + if (flush_something) blocks.emplace_back(prepareBlock(0)); /// Adds this empty block, so even unluckily, total_rows % rows_limit == 0, it would flush an empty block with header } @@ -118,28 +129,13 @@ class TestChunkDecodeAndSquash : public testing::Test auto last_block = decoder.flush(); if (last_block) decoded_blocks.push_back(std::move(last_block.value())); + /// flush after flush should return empty optional + ASSERT_TRUE(!decoder.flush()); // 4. Check correctness - std::vector reference_block_vec; - SquashingTransform squash_transform(block_rows * 4, 0, ""); - for (auto & block : blocks) - { - auto result = squash_transform.add(std::move(block)); - if (result.ready) - reference_block_vec.push_back(result.block); - } - Block empty; - auto last_result = squash_transform.add(std::move(empty)); - if (last_result.ready) - reference_block_vec.push_back(last_result.block); - - if (flush_somthing) - ASSERT_EQ(decoded_blocks.size(), reference_block_vec.size()); - else - ASSERT_EQ(decoded_blocks.size() + 1, reference_block_vec.size()); /// reference_block has an empty block at last - for (size_t i = 0; i < decoded_blocks.size(); ++i) - for (size_t j = 0; j < header.columns(); ++j) - ASSERT_COLUMN_EQ(reference_block_vec[i].getByPosition(j), decoded_blocks[i].getByPosition(j)); + Block reference_block = squashBlocks(blocks); + Block decoded_block = squashBlocks(decoded_blocks); + ASSERT_BLOCK_EQ(reference_block, decoded_block); } Context context; }; diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp new file mode 100644 index 00000000000..76e36271698 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp @@ -0,0 +1,448 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace DB +{ +namespace tests +{ +using Packet = TrackedMppDataPacket; +using PacketPtr = std::shared_ptr; +using PacketQueue = MPMCQueue; +using PacketQueuePtr = std::shared_ptr; + +bool equalSummaries(const ExecutionSummary & left, const ExecutionSummary & right) +{ + return (left.concurrency == right.concurrency) && + (left.num_iterations == right.num_iterations) && + (left.num_produced_rows == right.num_produced_rows) && + (left.time_processed_ns == right.time_processed_ns); +} + +struct MockWriter +{ + explicit MockWriter(PacketQueuePtr queue_) + : queue(queue_) + {} + + ExecutionSummary mockExecutionSummary() + { + ExecutionSummary summary; + summary.time_processed_ns = 100; + summary.num_produced_rows = 10000; + summary.num_iterations = 50; + summary.concurrency = 1; + return summary; + } + + void write(mpp::MPPDataPacket &, uint16_t) { FAIL() << "cannot reach here."; } + void write(mpp::MPPDataPacket & packet) + { + auto tracked_packet = std::make_shared(packet, nullptr); + if (add_summary) + { + tipb::SelectResponse response; + auto * summary_ptr = response.add_execution_summaries(); + auto summary = mockExecutionSummary(); + summary_ptr->set_time_processed_ns(summary.time_processed_ns); + summary_ptr->set_num_produced_rows(summary.num_produced_rows); + summary_ptr->set_num_iterations(summary.num_iterations); + summary_ptr->set_concurrency(summary.concurrency); + summary_ptr->set_executor_id("Executor_0"); + tracked_packet->serializeByResponse(response); + } + ++total_packets; + if (!tracked_packet->packet.chunks().empty()) + total_bytes += tracked_packet->packet.ByteSizeLong(); + queue->push(tracked_packet); + } + void write(tipb::SelectResponse &, uint16_t) { FAIL() << "cannot reach here."; } + void write(tipb::SelectResponse & response) + { + ++total_packets; + if (!response.chunks().empty()) + total_bytes += response.ByteSizeLong(); + mpp::MPPDataPacket packet; + auto tracked_packet = std::make_shared(packet, nullptr); + tracked_packet->serializeByResponse(response); + queue->push(tracked_packet); + } + uint16_t getPartitionNum() const { return 1; } + + PacketQueuePtr queue; + bool add_summary = false; + size_t total_packets = 0; + size_t total_bytes = 0; +}; + +// NOLINTBEGIN(readability-convert-member-functions-to-static) +struct MockReceiverContext +{ + using Status = ::grpc::Status; + struct Request + { + String debugString() const + { + return "{Request}"; + } + + int source_index = 0; + int send_task_id = 0; + int recv_task_id = -1; + }; + + struct Reader + { + explicit Reader(const PacketQueuePtr & queue_) + : queue(queue_) + {} + + void initialize() const + { + } + + bool read(PacketPtr & packet [[maybe_unused]]) const + { + PacketPtr res; + if (queue->pop(res) == MPMCQueueResult::OK) + { + *packet = *res; // avoid change shared packets + return true; + } + return false; + } + + Status finish() const + { + return ::grpc::Status(); + } + + void cancel(const String &) + { + } + + PacketQueuePtr queue; + }; + + struct MockAsyncGrpcExchangePacketReader + { + // Not implement benchmark for Async GRPC for now. + void init(UnaryCallback *) { assert(0); } + void read(TrackedMppDataPacketPtr &, UnaryCallback *) { assert(0); } + void finish(::grpc::Status &, UnaryCallback *) { assert(0); } + }; + + using AsyncReader = MockAsyncGrpcExchangePacketReader; + + MockReceiverContext( + PacketQueuePtr & queue_, + const std::vector & field_types_) + : queue(queue_) + , field_types(field_types_) + { + } + + void fillSchema(DAGSchema & schema) const + { + schema.clear(); + for (size_t i = 0; i < field_types.size(); ++i) + { + String name = "exchange_receiver_" + std::to_string(i); + ColumnInfo info = TiDB::fieldTypeToColumnInfo(field_types[i]); + schema.emplace_back(std::move(name), std::move(info)); + } + } + + Request makeRequest(int index) const + { + return {index, index, -1}; + } + + std::shared_ptr makeReader(const Request &) + { + return std::make_shared(queue); + } + + static Status getStatusOK() + { + return ::grpc::Status(); + } + + bool supportAsync(const Request &) const { return false; } + void makeAsyncReader( + const Request &, + std::shared_ptr &, + UnaryCallback *) const {} + + PacketQueuePtr queue; + std::vector field_types; +}; +using MockExchangeReceiver = ExchangeReceiverBase; +using MockWriterPtr = std::shared_ptr; +using MockExchangeReceiverInputStream = TiRemoteBlockInputStream; + +class TestTiRemoteBlockInputStream : public testing::Test +{ +protected: + void SetUp() override + { + dag_context_ptr = std::make_unique(1024); + dag_context_ptr->is_mpp_task = true; + dag_context_ptr->is_root_mpp_task = true; + dag_context_ptr->result_field_types = makeFields(); + dag_context_ptr->encode_type = tipb::EncodeType::TypeCHBlock; + context.setDAGContext(dag_context_ptr.get()); + } + +public: + TestTiRemoteBlockInputStream() + : context(TiFlashTestEnv::getContext()) + {} + + static Block squashBlocks(std::vector & blocks) + { + std::vector reference_block_vec; + SquashingTransform squash_transform(std::numeric_limits::max(), 0, ""); + for (auto & block : blocks) + squash_transform.add(std::move(block)); + Block empty; + auto result = squash_transform.add(std::move(empty)); + return result.block; + } + + // Return 10 Int64 column. + static std::vector makeFields() + { + std::vector fields(10); + for (int i = 0; i < 10; ++i) + { + fields[i].set_tp(TiDB::TypeLongLong); + fields[i].set_flag(TiDB::ColumnFlagNotNull); + } + return fields; + } + + static DAGSchema makeSchema() + { + auto fields = makeFields(); + DAGSchema schema; + for (size_t i = 0; i < fields.size(); ++i) + { + ColumnInfo info = TiDB::fieldTypeToColumnInfo(fields[i]); + schema.emplace_back(String("col") + std::to_string(i), std::move(info)); + } + return schema; + } + + // Return a block with **rows** and 10 Int64 column. + static Block prepareBlock(size_t rows) + { + Block block; + for (size_t i = 0; i < 10; ++i) + { + DataTypePtr int64_data_type = std::make_shared(); + auto int64_column = ColumnGenerator::instance().generate({rows, "Int64", RANDOM}).column; + block.insert(ColumnWithTypeAndName{ + std::move(int64_column), + int64_data_type, + String("col") + std::to_string(i)}); + } + return block; + } + + static void prepareBlocks( + std::vector & source_blocks, + bool empty_last_packet) + { + const size_t block_rows = 8192 * 3 / 8; + /// 61 is specially chosen so that the last packet contains 3072 rows, and then end of the queue + size_t block_num = empty_last_packet ? 60 : 61; + // 1. Build Blocks. + for (size_t i = 0; i < block_num; ++i) + source_blocks.emplace_back(prepareBlock(block_rows)); + } + + void prepareQueue( + std::shared_ptr & writer, + std::vector & source_blocks, + bool empty_last_packet) + { + prepareBlocks(source_blocks, empty_last_packet); + + const size_t batch_send_min_limit = 4096; + auto dag_writer = std::make_shared>( + writer, + batch_send_min_limit, + true, + *dag_context_ptr); + + // 2. encode all blocks + for (const auto & block : source_blocks) + dag_writer->write(block); + writer->add_summary = true; + dag_writer->finishWrite(); + } + + void prepareQueueV2( + std::shared_ptr & writer, + std::vector & source_blocks, + bool empty_last_packet) + { + dag_context_ptr->encode_type = tipb::EncodeType::TypeCHBlock; + prepareBlocks(source_blocks, empty_last_packet); + + const size_t batch_send_min_limit = 4096; + auto dag_writer = std::make_shared>( + writer, + 0, + batch_send_min_limit, + true, + *dag_context_ptr); + + // 2. encode all blocks + for (const auto & block : source_blocks) + dag_writer->write(block); + dag_writer->finishWrite(); + } + + void checkChunkInResponse( + std::vector & source_blocks, + std::vector & decoded_blocks, + std::shared_ptr & receiver_stream, + std::shared_ptr & writer) + { + /// Check Connection Info + auto infos = receiver_stream->getConnectionProfileInfos(); + ASSERT_EQ(infos.size(), 1); + ASSERT_EQ(infos[0].packets, writer->total_packets); + ASSERT_EQ(infos[0].bytes, writer->total_bytes); + + Block reference_block = squashBlocks(source_blocks); + Block decoded_block = squashBlocks(decoded_blocks); + ASSERT_EQ(receiver_stream->getTotalRows(), reference_block.rows()); + ASSERT_BLOCK_EQ(reference_block, decoded_block); + } + + void checkNoChunkInResponse( + std::vector & source_blocks, + std::vector & decoded_blocks, + std::shared_ptr & receiver_stream, + std::shared_ptr & writer) + { + /// Check Execution Summary + auto summary = receiver_stream->getRemoteExecutionSummaries(0); + ASSERT_EQ(summary->size(), 1); + ASSERT_EQ(summary->begin()->first, "Executor_0"); + ASSERT_TRUE(equalSummaries(writer->mockExecutionSummary(), summary->begin()->second)); + + /// Check Connection Info + auto infos = receiver_stream->getConnectionProfileInfos(); + ASSERT_EQ(infos.size(), 1); + ASSERT_EQ(infos[0].packets, writer->total_packets); + ASSERT_EQ(infos[0].bytes, writer->total_bytes); + + Block reference_block = squashBlocks(source_blocks); + Block decoded_block = squashBlocks(decoded_blocks); + ASSERT_BLOCK_EQ(reference_block, decoded_block); + ASSERT_EQ(receiver_stream->getTotalRows(), reference_block.rows()); + } + + std::shared_ptr makeExchangeReceiverInputStream( + PacketQueuePtr queue_ptr) + { + auto receiver = std::make_shared( + std::make_shared(queue_ptr, makeFields()), + 1, + 1, + "mock_req_id", + "mock_exchange_receiver_id", + 0); + auto receiver_stream = std::make_shared( + receiver, + "mock_req_id", + "executor_0", + 0); + return receiver_stream; + } + + void doTestNoChunkInResponse(bool empty_last_packet) + { + PacketQueuePtr queue_ptr = std::make_shared(1000); + std::vector source_blocks; + auto writer = std::make_shared(queue_ptr); + prepareQueue(writer, source_blocks, empty_last_packet); + queue_ptr->finish(); + + auto receiver_stream = makeExchangeReceiverInputStream(queue_ptr); + receiver_stream->readPrefix(); + std::vector decoded_blocks; + while (const auto & block = receiver_stream->read()) + decoded_blocks.emplace_back(block); + receiver_stream->readSuffix(); + checkNoChunkInResponse(source_blocks, decoded_blocks, receiver_stream, writer); + } + + void doTestChunkInResponse(bool empty_last_packet) + { + PacketQueuePtr queue_ptr = std::make_shared(1000); + std::vector source_blocks; + auto writer = std::make_shared(queue_ptr); + prepareQueueV2(writer, source_blocks, empty_last_packet); + queue_ptr->finish(); + auto receiver_stream = makeExchangeReceiverInputStream(queue_ptr); + receiver_stream->readPrefix(); + std::vector decoded_blocks; + while (const auto & block = receiver_stream->read()) + decoded_blocks.emplace_back(block); + receiver_stream->readSuffix(); + checkChunkInResponse(source_blocks, decoded_blocks, receiver_stream, writer); + } + + Context context; + std::unique_ptr dag_context_ptr; +}; + +TEST_F(TestTiRemoteBlockInputStream, testNoChunkInResponse) +try +{ + doTestNoChunkInResponse(true); + doTestNoChunkInResponse(false); +} +CATCH + +TEST_F(TestTiRemoteBlockInputStream, testChunksInResponse) +try +{ + doTestChunkInResponse(true); + doTestChunkInResponse(false); +} +CATCH + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 2eaa91c4893..6537d49dffd 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -701,9 +701,11 @@ DecodeDetail ExchangeReceiverBase::decodeChunks( if (!result) continue; detail.rows += result->rows(); - detail.produce_block_flag = true; if likely(result->rows() > 0) + { + detail.produce_block_flag = true; block_queue.push(std::move(result.value())); + } } return detail; } @@ -739,12 +741,9 @@ std::vector ExchangeReceiverBase::nextResult results.push_back(ExchangeReceiverResult::newError(recv_msg->source_index, recv_msg->req_info, recv_msg->error_ptr->msg())); return results; } - const ExchangeReceiverResult & result = toDecodeResult(block_queue, header, recv_msg, decoder_ptr); - if (unlikely(result.meet_error) || result.decode_detail.produce_block_flag) - { - results.push_back(result); + results.push_back(toDecodeResult(block_queue, header, recv_msg, decoder_ptr)); + if (unlikely(results.back().meet_error) || results.back().decode_detail.produce_block_flag) return results; - } } } } @@ -763,8 +762,11 @@ void ExchangeReceiverBase::handleUnnormalChannel( } else { + /// If there are cached data in squashDecoder, then just push the block and return EOF next iteration if (last_block && last_block->rows() > 0) { + RUNTIME_ASSERT(!results.empty()); + results.back().decode_detail.rows += last_block->rows(); block_queue.push(std::move(last_block.value())); } else From 28b9840c7c9095832947c648715b1be5e9fb70d8 Mon Sep 17 00:00:00 2001 From: yibin Date: Fri, 28 Oct 2022 09:34:14 +0800 Subject: [PATCH 04/13] Remove squash transformation from TiRemoteReadeBlockInputStream Signed-off-by: yibin --- dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp | 2 -- dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.cpp | 2 -- 2 files changed, 4 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 77cbdbada2f..cd4dd13de98 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -29,7 +29,6 @@ #include #include #include -#include #include #include #include @@ -479,7 +478,6 @@ void DAGQueryBlockInterpreter::handleExchangeReceiver(DAGPipeline & pipeline) query_block.source_name, /*stream_id=*/enable_fine_grained_shuffle ? i : 0); exchange_receiver_io_input_streams.push_back(stream); - stream = std::make_shared(stream, 8192, 0, log->identifier()); stream->setExtraInfo(extra_info); pipeline.streams.push_back(stream); } diff --git a/dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.cpp b/dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.cpp index 03fd2804fb3..68799368860 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.cpp @@ -13,7 +13,6 @@ // limitations under the License. #include -#include #include #include #include @@ -84,7 +83,6 @@ void PhysicalExchangeReceiver::transformImpl(DAGPipeline & pipeline, Context & c execId(), /*stream_id=*/enable_fine_grained_shuffle ? i : 0); exchange_receiver_io_input_streams.push_back(stream); - stream = std::make_shared(stream, 8192, 0, log->identifier()); stream->setExtraInfo(extra_info); pipeline.streams.push_back(stream); } From 3174eabeee4136da5ed7fc8dac0939bcf9e090a7 Mon Sep 17 00:00:00 2001 From: yibin Date: Fri, 28 Oct 2022 09:38:00 +0800 Subject: [PATCH 05/13] Fix a little issue Signed-off-by: yibin --- dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 6537d49dffd..22937cdd85b 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -754,7 +754,10 @@ void ExchangeReceiverBase::handleUnnormalChannel( std::vector & results, std::unique_ptr & decoder_ptr) { - std::optional last_block = decoder_ptr->flush(); + std::optional last_block; + if (decoder_ptr) + last_block = decoder_ptr->flush(); + std::unique_lock lock(mu); if (this->state != DB::ExchangeReceiverState::NORMAL) { From b2d37b707a0f053ece8ae1112d6a2d4d0cadf891 Mon Sep 17 00:00:00 2001 From: yibin Date: Tue, 1 Nov 2022 16:23:38 +0800 Subject: [PATCH 06/13] Merge two while loop and some changes to comments Signed-off-by: yibin --- .../DataStreams/NativeBlockInputStream.cpp | 1 + dbms/src/DataStreams/NativeBlockInputStream.h | 2 +- .../DataStreams/TiRemoteBlockInputStream.h | 55 ++++++++++--------- .../Flash/Coprocessor/CHBlockChunkCodec.cpp | 2 +- dbms/src/Flash/Coprocessor/CodecUtils.cpp | 2 +- .../src/Flash/Coprocessor/CoprocessorReader.h | 31 ++++------- dbms/src/Flash/Coprocessor/DecodeDetail.h | 4 -- .../Coprocessor/IChunkDecodeAndSquash.cpp | 9 +-- .../Flash/Coprocessor/IChunkDecodeAndSquash.h | 3 +- .../tests/gtest_chunk_decode_and_squash.cpp | 4 +- .../gtest_ti_remote_block_inputstream.cpp | 5 +- dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 52 ++++++------------ dbms/src/Flash/Mpp/ExchangeReceiver.h | 18 ++++-- 13 files changed, 82 insertions(+), 106 deletions(-) diff --git a/dbms/src/DataStreams/NativeBlockInputStream.cpp b/dbms/src/DataStreams/NativeBlockInputStream.cpp index 3c71a9cc29d..53fe9a225c3 100644 --- a/dbms/src/DataStreams/NativeBlockInputStream.cpp +++ b/dbms/src/DataStreams/NativeBlockInputStream.cpp @@ -20,6 +20,7 @@ #include #include #include + #include diff --git a/dbms/src/DataStreams/NativeBlockInputStream.h b/dbms/src/DataStreams/NativeBlockInputStream.h index 52ff35b86ff..296d64b1bce 100644 --- a/dbms/src/DataStreams/NativeBlockInputStream.h +++ b/dbms/src/DataStreams/NativeBlockInputStream.h @@ -17,8 +17,8 @@ #include #include #include -#include #include +#include namespace DB { diff --git a/dbms/src/DataStreams/TiRemoteBlockInputStream.h b/dbms/src/DataStreams/TiRemoteBlockInputStream.h index 8ce6e43001a..08701ad100d 100644 --- a/dbms/src/DataStreams/TiRemoteBlockInputStream.h +++ b/dbms/src/DataStreams/TiRemoteBlockInputStream.h @@ -132,36 +132,37 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream { while (true) { - auto results = remote_reader->nextResult(block_queue, sample_block, stream_id, decoder_ptr); - for (auto & result : results) + auto result = remote_reader->nextResult(block_queue, sample_block, stream_id, decoder_ptr); + if (result.meet_error) { - if (result.meet_error) - { - LOG_WARNING(log, "remote reader meets error: {}", result.error_msg); - throw Exception(result.error_msg); - } - if (result.eof) - return false; - if (result.resp != nullptr && result.resp->has_error()) + LOG_WARNING(log, "remote reader meets error: {}", result.error_msg); + throw Exception(result.error_msg); + } + if (result.eof) + return false; + if (result.resp != nullptr && result.resp->has_error()) + { + LOG_WARNING(log, "remote reader meets error: {}", result.resp->error().DebugString()); + throw Exception(result.resp->error().DebugString()); + } + /// only the last response contains execution summaries + if (result.resp != nullptr) + { + if constexpr (is_streaming_reader) { - LOG_WARNING(log, "remote reader meets error: {}", result.resp->error().DebugString()); - throw Exception(result.resp->error().DebugString()); + addRemoteExecutionSummaries(*result.resp, result.call_index, true); } - /// only the last response contains execution summaries - if (result.resp != nullptr) + else { - if constexpr (is_streaming_reader) - { - addRemoteExecutionSummaries(*result.resp, result.call_index, true); - } - else - { - addRemoteExecutionSummaries(*result.resp, 0, false); - } + addRemoteExecutionSummaries(*result.resp, 0, false); } + } - const auto & decode_detail = result.decode_detail; + const auto & decode_detail = result.decode_detail; + total_rows += decode_detail.rows; + if (!result.rowsInfoOnly) + { size_t index = 0; if constexpr (is_streaming_reader) index = result.call_index; @@ -169,17 +170,17 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream ++connection_profile_infos[index].packets; connection_profile_infos[index].bytes += decode_detail.packet_bytes; - total_rows += decode_detail.rows; LOG_TRACE( log, "recv {} rows from remote for {}, total recv row num: {}", decode_detail.rows, result.req_info, total_rows); - if (decode_detail.rows > 0) - return true; - // else continue } + + if (decode_detail.rows > 0) + return true; + // else continue } } diff --git a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp index 46e976adafd..c7aa400ea48 100644 --- a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp +++ b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp @@ -15,10 +15,10 @@ #include #include #include +#include #include #include #include -#include namespace DB { diff --git a/dbms/src/Flash/Coprocessor/CodecUtils.cpp b/dbms/src/Flash/Coprocessor/CodecUtils.cpp index 174e513ed4a..b82a0c4846d 100644 --- a/dbms/src/Flash/Coprocessor/CodecUtils.cpp +++ b/dbms/src/Flash/Coprocessor/CodecUtils.cpp @@ -13,8 +13,8 @@ // limitations under the License. #include -#include #include +#include namespace DB { diff --git a/dbms/src/Flash/Coprocessor/CoprocessorReader.h b/dbms/src/Flash/Coprocessor/CoprocessorReader.h index 73dde5f7e0a..5bb59494ac5 100644 --- a/dbms/src/Flash/Coprocessor/CoprocessorReader.h +++ b/dbms/src/Flash/Coprocessor/CoprocessorReader.h @@ -47,6 +47,8 @@ struct CoprocessorReaderResult bool meet_error; String error_msg; bool eof; + /// Always false, aligned with ExchangeReceiverResult struct + bool rowsInfoOnly = false; String req_info = "cop request"; DecodeDetail decode_detail; @@ -141,49 +143,36 @@ class CoprocessorReader } // stream_id, decoder_ptr are only meaningful for ExchagneReceiver. - std::vector nextResult(std::queue & block_queue, const Block & header, size_t /*stream_id*/, std::unique_ptr & /*decoder_ptr*/) + CoprocessorReaderResult nextResult(std::queue & block_queue, const Block & header, size_t /*stream_id*/, std::unique_ptr & /*decoder_ptr*/) { - std::vector results; auto && [result, has_next] = resp_iter.next(); if (!result.error.empty()) - { - results.push_back({nullptr, true, result.error.message(), false}); - return results; - } + return {nullptr, true, result.error.message(), false}; if (!has_next) - { - results.push_back({nullptr, false, "", true}); - return results; - } + return {nullptr, false, "", true}; auto resp = std::make_shared(); if (resp->ParseFromString(result.data())) { if (resp->has_error()) { - results.push_back({nullptr, true, resp->error().DebugString(), false}); - return results; + return {nullptr, true, resp->error().DebugString(), false}; } else if (has_enforce_encode_type && resp->encode_type() != tipb::EncodeType::TypeCHBlock && resp->chunks_size() > 0) { - results.push_back( - { - nullptr, + return {nullptr, true, "Encode type of coprocessor response is not CHBlock, " "maybe the version of some TiFlash node in the cluster is not match with this one", - false}); - return results; + false}; } auto detail = decodeChunks(resp, block_queue, header, schema); - results.push_back({resp, false, "", false, detail}); - return results; + return {resp, false, "", false, detail}; } else { - results.push_back({nullptr, true, "Error while decoding coprocessor::Response", false}); - return results; + return {nullptr, true, "Error while decoding coprocessor::Response", false}; } } diff --git a/dbms/src/Flash/Coprocessor/DecodeDetail.h b/dbms/src/Flash/Coprocessor/DecodeDetail.h index 081854c4473..91851650d9e 100644 --- a/dbms/src/Flash/Coprocessor/DecodeDetail.h +++ b/dbms/src/Flash/Coprocessor/DecodeDetail.h @@ -28,9 +28,5 @@ struct DecodeDetail // Total byte size of the origin packet, even for fine grained shuffle. Int64 packet_bytes = 0; - - // For decodeAndSquash usage, several packets might be squashed to produce one block. - // This flag is true only when: during the decode process of this packet, new block is produced. - bool produce_block_flag = true; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.cpp b/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.cpp index 2ec2b7285f3..ddae1e1f371 100644 --- a/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.cpp +++ b/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.cpp @@ -17,7 +17,6 @@ namespace DB { - CHBlockChunkDecodeAndSquash::CHBlockChunkDecodeAndSquash( const Block & header, size_t rows_limit_) @@ -37,7 +36,8 @@ std::optional CHBlockChunkDecodeAndSquash::decodeAndSquash(const String & return res; } - if (!accumulated_block) { + if (!accumulated_block) + { /// hard-code 1.5 here, since final column size will be more than rows_limit in most situations, /// so it should be larger than 1.0, just use 1.5 here, no special meaning Block block = codec.decodeImpl(istr, static_cast(rows_limit * 1.5)); @@ -57,16 +57,13 @@ std::optional CHBlockChunkDecodeAndSquash::decodeAndSquash(const String & ColumnWithTypeAndName column; codec.readColumnMeta(i, istr, column); - /// Data - MutableColumnPtr read_column = column.type->createColumn(); if (rows) /// If no rows, nothing to read. CHBlockChunkCodec::readData(*column.type, *(mutable_columns[i]), istr, rows); } accumulated_block->setColumns(std::move(mutable_columns)); } - RUNTIME_ASSERT(accumulated_block); - if (accumulated_block->rows() >= rows_limit) + if (accumulated_block && accumulated_block->rows() >= rows_limit) { /// Return accumulated data and reset accumulated_block res.swap(accumulated_block); diff --git a/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.h b/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.h index eb9f6077ae3..9ba8e821535 100644 --- a/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.h +++ b/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.h @@ -14,9 +14,9 @@ #pragma once +#include #include #include -#include namespace DB { @@ -37,6 +37,7 @@ class CHBlockChunkDecodeAndSquash final : public IChunkDecodeAndSquash virtual ~CHBlockChunkDecodeAndSquash() = default; std::optional decodeAndSquash(const String &); std::optional flush(); + private: CHBlockChunkCodec codec; std::optional accumulated_block; diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_chunk_decode_and_squash.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_chunk_decode_and_squash.cpp index 262c973d43d..ace861b1c41 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_chunk_decode_and_squash.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_chunk_decode_and_squash.cpp @@ -12,14 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include #include -#include #include -#include #include +#include #include #include diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp index 76e36271698..5be248761c4 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp @@ -39,10 +39,7 @@ using PacketQueuePtr = std::shared_ptr; bool equalSummaries(const ExecutionSummary & left, const ExecutionSummary & right) { - return (left.concurrency == right.concurrency) && - (left.num_iterations == right.num_iterations) && - (left.num_produced_rows == right.num_produced_rows) && - (left.time_processed_ns == right.time_processed_ns); + return (left.concurrency == right.concurrency) && (left.num_iterations == right.num_iterations) && (left.num_produced_rows == right.num_produced_rows) && (left.time_processed_ns == right.time_processed_ns); } struct MockWriter diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 22937cdd85b..0386dbe3a9c 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "ExchangeReceiver.h" - #include #include #include @@ -694,16 +692,14 @@ DecodeDetail ExchangeReceiverBase::decodeChunks( // Record total packet size even if fine grained shuffle is enabled. detail.packet_bytes = packet.ByteSizeLong(); - detail.produce_block_flag = false; for (const String * chunk : recv_msg->chunks) { auto result = decoder_ptr->decodeAndSquash(*chunk); if (!result) continue; detail.rows += result->rows(); - if likely(result->rows() > 0) + if likely (result->rows() > 0) { - detail.produce_block_flag = true; block_queue.push(std::move(result.value())); } } @@ -711,47 +707,35 @@ DecodeDetail ExchangeReceiverBase::decodeChunks( } template -std::vector ExchangeReceiverBase::nextResult( +ExchangeReceiverResult ExchangeReceiverBase::nextResult( std::queue & block_queue, const Block & header, size_t stream_id, std::unique_ptr & decoder_ptr) { - std::vector results; if (unlikely(stream_id >= msg_channels.size())) { LOG_ERROR(exc_log, "stream_id out of range, stream_id: {}, total_stream_count: {}", stream_id, msg_channels.size()); - results.push_back(ExchangeReceiverResult::newError(0, "", "stream_id out of range")); - return results; + return ExchangeReceiverResult::newError(0, "", "stream_id out of range"); } - while (true) + std::shared_ptr recv_msg; + if (msg_channels[stream_id]->pop(recv_msg) != MPMCQueueResult::OK) { - std::shared_ptr recv_msg; - if (msg_channels[stream_id]->pop(recv_msg) != MPMCQueueResult::OK) - { - handleUnnormalChannel(block_queue, results, decoder_ptr); - return results; - } - else - { - assert(recv_msg != nullptr); - if (unlikely(recv_msg->error_ptr != nullptr)) - { - results.push_back(ExchangeReceiverResult::newError(recv_msg->source_index, recv_msg->req_info, recv_msg->error_ptr->msg())); - return results; - } - results.push_back(toDecodeResult(block_queue, header, recv_msg, decoder_ptr)); - if (unlikely(results.back().meet_error) || results.back().decode_detail.produce_block_flag) - return results; - } + return handleUnnormalChannel(block_queue, decoder_ptr); + } + else + { + assert(recv_msg != nullptr); + if (unlikely(recv_msg->error_ptr != nullptr)) + return ExchangeReceiverResult::newError(recv_msg->source_index, recv_msg->req_info, recv_msg->error_ptr->msg()); + return toDecodeResult(block_queue, header, recv_msg, decoder_ptr); } } template -void ExchangeReceiverBase::handleUnnormalChannel( +ExchangeReceiverResult ExchangeReceiverBase::handleUnnormalChannel( std::queue & block_queue, - std::vector & results, std::unique_ptr & decoder_ptr) { std::optional last_block; @@ -761,20 +745,20 @@ void ExchangeReceiverBase::handleUnnormalChannel( std::unique_lock lock(mu); if (this->state != DB::ExchangeReceiverState::NORMAL) { - results.push_back(DB::ExchangeReceiverResult::newError(0, DB::ExchangeReceiverBase::name, DB::constructStatusString(this->state, this->err_msg))); + return DB::ExchangeReceiverResult::newError(0, DB::ExchangeReceiverBase::name, DB::constructStatusString(this->state, this->err_msg)); } else { /// If there are cached data in squashDecoder, then just push the block and return EOF next iteration if (last_block && last_block->rows() > 0) { - RUNTIME_ASSERT(!results.empty()); - results.back().decode_detail.rows += last_block->rows(); + const auto & result = DB::ExchangeReceiverResult::newRowsInfoOnly(last_block->rows()); block_queue.push(std::move(last_block.value())); + return result; } else { - results.push_back(DB::ExchangeReceiverResult::newEOF(DB::ExchangeReceiverBase::name)); /// live_connections == 0, msg_channel is finished, and state is NORMAL, that is the end. + return DB::ExchangeReceiverResult::newEOF(DB::ExchangeReceiverBase::name); /// live_connections == 0, msg_channel is finished, and state is NORMAL, that is the end. } } } diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.h b/dbms/src/Flash/Mpp/ExchangeReceiver.h index 15e203e19dd..cbb1dae9d4d 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.h +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.h @@ -68,6 +68,8 @@ struct ExchangeReceiverResult bool meet_error; String error_msg; bool eof; + /// Result only contains rows info in decode_detail, 'rowsInfoOnly' is true only when flushing data previous to eod + bool rowsInfoOnly; DecodeDetail decode_detail; ExchangeReceiverResult() @@ -79,6 +81,13 @@ struct ExchangeReceiverResult return {resp_, call_index_, req_info_, /*meet_error*/ false, /*error_msg*/ "", /*eof*/ false}; } + static ExchangeReceiverResult newRowsInfoOnly(size_t rows) + { + ExchangeReceiverResult result(/*resp*/ nullptr, 0, "", /*meet_error*/ false, /*error_msg*/ "", /*eof*/ false, /*rowsInfoOnly*/ true); + result.decode_detail.rows = rows; + return result; + } + static ExchangeReceiverResult newEOF(const String & req_info_) { return {/*resp*/ nullptr, 0, req_info_, /*meet_error*/ false, /*error_msg*/ "", /*eof*/ true}; @@ -96,13 +105,15 @@ struct ExchangeReceiverResult const String & req_info_ = "", bool meet_error_ = false, const String & error_msg_ = "", - bool eof_ = false) + bool eof_ = false, + bool rowsInfoOnly_ = false) : resp(resp_) , call_index(call_index_) , req_info(req_info_) , meet_error(meet_error_) , error_msg(error_msg_) , eof(eof_) + , rowsInfoOnly(rowsInfoOnly_) {} }; @@ -140,7 +151,7 @@ class ExchangeReceiverBase const DAGSchema & getOutputSchema() const { return schema; } - std::vector nextResult( + ExchangeReceiverResult nextResult( std::queue & block_queue, const Block & header, size_t stream_id, @@ -179,9 +190,8 @@ class ExchangeReceiverBase bool setEndState(ExchangeReceiverState new_state); String getStatusString(); - void handleUnnormalChannel( + ExchangeReceiverResult handleUnnormalChannel( std::queue & block_queue, - std::vector & results, std::unique_ptr & decoder_ptr); DecodeDetail decodeChunks( From 33ee23bb747bcf37adc193b3fd99a7a750071570 Mon Sep 17 00:00:00 2001 From: yibin Date: Tue, 1 Nov 2022 16:36:01 +0800 Subject: [PATCH 07/13] Fix comment Signed-off-by: yibin --- dbms/src/Flash/Mpp/ExchangeReceiver.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.h b/dbms/src/Flash/Mpp/ExchangeReceiver.h index cbb1dae9d4d..3994707c078 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.h +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.h @@ -68,7 +68,7 @@ struct ExchangeReceiverResult bool meet_error; String error_msg; bool eof; - /// Result only contains rows info in decode_detail, 'rowsInfoOnly' is true only when flushing data previous to eod + /// Result only contains rows info in decode_detail, 'rowsInfoOnly' is true only when flushing data previous to eof bool rowsInfoOnly; DecodeDetail decode_detail; From f924e626a763ff83565ad1658514c304ad97c71f Mon Sep 17 00:00:00 2001 From: yibin Date: Wed, 2 Nov 2022 10:19:25 +0800 Subject: [PATCH 08/13] Changes to comments Signed-off-by: yibin --- dbms/src/DataStreams/NativeBlockInputStream.h | 2 +- .../DataStreams/TiRemoteBlockInputStream.h | 28 +++++++++---------- .../src/Flash/Coprocessor/CoprocessorReader.h | 2 -- dbms/src/Flash/Coprocessor/DecodeDetail.h | 3 ++ dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 5 +++- dbms/src/Flash/Mpp/ExchangeReceiver.h | 13 +-------- 6 files changed, 22 insertions(+), 31 deletions(-) diff --git a/dbms/src/DataStreams/NativeBlockInputStream.h b/dbms/src/DataStreams/NativeBlockInputStream.h index 296d64b1bce..418a93fe4f7 100644 --- a/dbms/src/DataStreams/NativeBlockInputStream.h +++ b/dbms/src/DataStreams/NativeBlockInputStream.h @@ -125,7 +125,7 @@ class NativeBlockInputStream : public IProfilingBlockInputStream IndexOfBlockForNativeFormat::Columns::const_iterator index_column_it; /// If an index is specified, then `istr` must be CompressedReadBufferFromFile. - CompressedReadBufferFromFile<> * istr_concrete; + CompressedReadBufferFromFile<> * istr_concrete = nullptr; PODArray avg_value_size_hints; diff --git a/dbms/src/DataStreams/TiRemoteBlockInputStream.h b/dbms/src/DataStreams/TiRemoteBlockInputStream.h index 08701ad100d..8870d2d1124 100644 --- a/dbms/src/DataStreams/TiRemoteBlockInputStream.h +++ b/dbms/src/DataStreams/TiRemoteBlockInputStream.h @@ -161,22 +161,19 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream const auto & decode_detail = result.decode_detail; total_rows += decode_detail.rows; - if (!result.rowsInfoOnly) - { - size_t index = 0; - if constexpr (is_streaming_reader) - index = result.call_index; + size_t index = 0; + if constexpr (is_streaming_reader) + index = result.call_index; - ++connection_profile_infos[index].packets; - connection_profile_infos[index].bytes += decode_detail.packet_bytes; + connection_profile_infos[index].packets += decode_detail.packets; + connection_profile_infos[index].bytes += decode_detail.packet_bytes; - LOG_TRACE( - log, - "recv {} rows from remote for {}, total recv row num: {}", - decode_detail.rows, - result.req_info, - total_rows); - } + LOG_TRACE( + log, + "recv {} rows from remote for {}, total recv row num: {}", + decode_detail.rows, + result.req_info, + total_rows); if (decode_detail.rows > 0) return true; @@ -201,8 +198,9 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream execution_summaries.resize(source_num); connection_profile_infos.resize(source_num); sample_block = Block(getColumnWithTypeAndName(toNamesAndTypes(remote_reader->getOutputSchema()))); + constexpr size_t squash_rows_limit = 8192; if constexpr (is_streaming_reader) - decoder_ptr = std::make_unique(sample_block, 8192); + decoder_ptr = std::make_unique(sample_block, squash_rows_limit); } Block getHeader() const override { return sample_block; } diff --git a/dbms/src/Flash/Coprocessor/CoprocessorReader.h b/dbms/src/Flash/Coprocessor/CoprocessorReader.h index 5bb59494ac5..cebf2c080f3 100644 --- a/dbms/src/Flash/Coprocessor/CoprocessorReader.h +++ b/dbms/src/Flash/Coprocessor/CoprocessorReader.h @@ -47,8 +47,6 @@ struct CoprocessorReaderResult bool meet_error; String error_msg; bool eof; - /// Always false, aligned with ExchangeReceiverResult struct - bool rowsInfoOnly = false; String req_info = "cop request"; DecodeDetail decode_detail; diff --git a/dbms/src/Flash/Coprocessor/DecodeDetail.h b/dbms/src/Flash/Coprocessor/DecodeDetail.h index 91851650d9e..2c501e96820 100644 --- a/dbms/src/Flash/Coprocessor/DecodeDetail.h +++ b/dbms/src/Flash/Coprocessor/DecodeDetail.h @@ -21,6 +21,9 @@ namespace DB /// Detail of the packet that decoding in TiRemoteInputStream.RemoteReader.decodeChunks() struct DecodeDetail { + // Responding packets count, usually be 1, be 0 when flush data before eof + Int64 packets = 1; + // For fine grained shuffle, each ExchangeReceiver/thread will decode its own blocks. // So this is the row number of partial blocks of the original packet. // This will be the row number of all blocks of the original packet if it's not fine grained shuffle. diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 0386dbe3a9c..1819ca099b5 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -752,7 +752,10 @@ ExchangeReceiverResult ExchangeReceiverBase::handleUnnormalChannel( /// If there are cached data in squashDecoder, then just push the block and return EOF next iteration if (last_block && last_block->rows() > 0) { - const auto & result = DB::ExchangeReceiverResult::newRowsInfoOnly(last_block->rows()); + /// Can't get correct caller_index here, use 0 instead + auto result = ExchangeReceiverResult::newOk(nullptr, 0, ""); + result.decode_detail.packets = 0; + result.decode_detail.rows = last_block->rows(); block_queue.push(std::move(last_block.value())); return result; } diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.h b/dbms/src/Flash/Mpp/ExchangeReceiver.h index 3994707c078..37c905ffaf3 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.h +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.h @@ -68,8 +68,6 @@ struct ExchangeReceiverResult bool meet_error; String error_msg; bool eof; - /// Result only contains rows info in decode_detail, 'rowsInfoOnly' is true only when flushing data previous to eof - bool rowsInfoOnly; DecodeDetail decode_detail; ExchangeReceiverResult() @@ -81,13 +79,6 @@ struct ExchangeReceiverResult return {resp_, call_index_, req_info_, /*meet_error*/ false, /*error_msg*/ "", /*eof*/ false}; } - static ExchangeReceiverResult newRowsInfoOnly(size_t rows) - { - ExchangeReceiverResult result(/*resp*/ nullptr, 0, "", /*meet_error*/ false, /*error_msg*/ "", /*eof*/ false, /*rowsInfoOnly*/ true); - result.decode_detail.rows = rows; - return result; - } - static ExchangeReceiverResult newEOF(const String & req_info_) { return {/*resp*/ nullptr, 0, req_info_, /*meet_error*/ false, /*error_msg*/ "", /*eof*/ true}; @@ -105,15 +96,13 @@ struct ExchangeReceiverResult const String & req_info_ = "", bool meet_error_ = false, const String & error_msg_ = "", - bool eof_ = false, - bool rowsInfoOnly_ = false) + bool eof_ = false) : resp(resp_) , call_index(call_index_) , req_info(req_info_) , meet_error(meet_error_) , error_msg(error_msg_) , eof(eof_) - , rowsInfoOnly(rowsInfoOnly_) {} }; From 6e3fa2b922af322d77ad5d25df1d52442953dd0d Mon Sep 17 00:00:00 2001 From: yibin Date: Wed, 2 Nov 2022 12:40:05 +0800 Subject: [PATCH 09/13] Change reserve size logic Signed-off-by: yibin --- dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp index c7aa400ea48..75b53c90398 100644 --- a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp +++ b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp @@ -164,7 +164,9 @@ Block CHBlockChunkCodec::decodeImpl(ReadBuffer & istr, size_t reserve_size) /// Data MutableColumnPtr read_column = column.type->createColumn(); if (reserve_size > 0) - read_column->reserve(reserve_size); + read_column->reserve(std::max(rows, reserve_size)); + else if (rows) + read_column->reserve(rows); if (rows) /// If no rows, nothing to read. readData(*column.type, *read_column, istr, rows); From e253bbb0fd27136dd069867f63e1bcf9777342c2 Mon Sep 17 00:00:00 2001 From: yibin Date: Wed, 2 Nov 2022 14:12:02 +0800 Subject: [PATCH 10/13] Format gtest header files Signed-off-by: yibin --- .../Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp index 5be248761c4..375d1784aec 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp @@ -16,6 +16,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -24,9 +27,6 @@ #include #include -#include -#include -#include namespace DB { From 4f425fb4ef0b09b8e329f5dc702fed26d033d6fc Mon Sep 17 00:00:00 2001 From: yibin Date: Wed, 2 Nov 2022 14:32:42 +0800 Subject: [PATCH 11/13] Fix format issue Signed-off-by: yibin --- .../tests/gtest_ti_remote_block_inputstream.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp index 375d1784aec..c6598e556c1 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp @@ -16,9 +16,6 @@ #include #include #include -#include -#include -#include #include #include #include @@ -27,6 +24,10 @@ #include #include +#include +#include +#include + namespace DB { From 23a7d92586a258922269095f197fad19be147a32 Mon Sep 17 00:00:00 2001 From: yibin Date: Wed, 2 Nov 2022 14:58:27 +0800 Subject: [PATCH 12/13] Refact a little Signed-off-by: yibin --- dbms/src/DataStreams/TiRemoteBlockInputStream.h | 1 - .../Flash/Coprocessor/IChunkDecodeAndSquash.cpp | 15 ++++++++------- dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 5 +---- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/dbms/src/DataStreams/TiRemoteBlockInputStream.h b/dbms/src/DataStreams/TiRemoteBlockInputStream.h index 8870d2d1124..83290e8a422 100644 --- a/dbms/src/DataStreams/TiRemoteBlockInputStream.h +++ b/dbms/src/DataStreams/TiRemoteBlockInputStream.h @@ -219,7 +219,6 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream if (!fetchRemoteResult()) return {}; } - // todo should merge some blocks to make sure the output block is big enough Block block = block_queue.front(); block_queue.pop(); return block; diff --git a/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.cpp b/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.cpp index ddae1e1f371..8ef5e6a910c 100644 --- a/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.cpp +++ b/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.cpp @@ -51,16 +51,17 @@ std::optional CHBlockChunkDecodeAndSquash::decodeAndSquash(const String & size_t rows = 0; codec.readBlockMeta(istr, columns, rows); - auto mutable_columns = accumulated_block->mutateColumns(); - for (size_t i = 0; i < columns; ++i) + if (rows) { - ColumnWithTypeAndName column; - codec.readColumnMeta(i, istr, column); - - if (rows) /// If no rows, nothing to read. + auto mutable_columns = accumulated_block->mutateColumns(); + for (size_t i = 0; i < columns; ++i) + { + ColumnWithTypeAndName column; + codec.readColumnMeta(i, istr, column); CHBlockChunkCodec::readData(*column.type, *(mutable_columns[i]), istr, rows); + } + accumulated_block->setColumns(std::move(mutable_columns)); } - accumulated_block->setColumns(std::move(mutable_columns)); } if (accumulated_block && accumulated_block->rows() >= rows_limit) diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 1819ca099b5..874b1f027d3 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -738,10 +738,7 @@ ExchangeReceiverResult ExchangeReceiverBase::handleUnnormalChannel( std::queue & block_queue, std::unique_ptr & decoder_ptr) { - std::optional last_block; - if (decoder_ptr) - last_block = decoder_ptr->flush(); - + std::optional last_block = decoder_ptr->flush(); std::unique_lock lock(mu); if (this->state != DB::ExchangeReceiverState::NORMAL) { From e8594cff811c266cd97386197871274c2a5fc223 Mon Sep 17 00:00:00 2001 From: yibin Date: Wed, 2 Nov 2022 16:15:33 +0800 Subject: [PATCH 13/13] Little update Signed-off-by: yibin --- dbms/src/Flash/Coprocessor/CodecUtils.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/CodecUtils.cpp b/dbms/src/Flash/Coprocessor/CodecUtils.cpp index b82a0c4846d..ceffcbdc7c6 100644 --- a/dbms/src/Flash/Coprocessor/CodecUtils.cpp +++ b/dbms/src/Flash/Coprocessor/CodecUtils.cpp @@ -27,7 +27,7 @@ namespace CodecUtils { void checkColumnSize(size_t expected, size_t actual) { - if (expected != actual) + if unlikely (expected != actual) throw Exception( fmt::format("NativeBlockInputStream schema mismatch, expected {}, actual {}.", expected, actual), ErrorCodes::LOGICAL_ERROR); @@ -35,7 +35,7 @@ void checkColumnSize(size_t expected, size_t actual) void checkDataTypeName(size_t column_index, const String & expected, const String & actual) { - if (expected != actual) + if unlikely (expected != actual) throw Exception( fmt::format( "NativeBlockInputStream schema mismatch at column {}, expected {}, actual {}",