From fa590d818c6a22787fbbe168298e79162cf41def Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 22 May 2024 18:02:19 +0800 Subject: [PATCH 01/25] add notify_future for mpptunnel --- dbms/src/Common/GRPCQueue.h | 6 ++++++ dbms/src/Flash/Mpp/LocalRequestHandler.h | 3 +++ dbms/src/Flash/Mpp/MPPTunnel.h | 24 +++++++++++++++++++++-- dbms/src/Flash/Mpp/ReceivedMessageQueue.h | 3 +++ 4 files changed, 34 insertions(+), 2 deletions(-) diff --git a/dbms/src/Common/GRPCQueue.h b/dbms/src/Common/GRPCQueue.h index 333c94a5381..497dab3a4ad 100644 --- a/dbms/src/Common/GRPCQueue.h +++ b/dbms/src/Common/GRPCQueue.h @@ -139,6 +139,9 @@ class GRPCSendQueue bool isWritable() const { return send_queue.isWritable(); } + void registerPipeReadTask(TaskPtr && task) { send_queue.registerPipeReadTask(std::move(task)); } + void registerPipeWriteTask(TaskPtr && task) { send_queue.registerPipeWriteTask(std::move(task)); } + private: friend class tests::TestGRPCSendQueue; @@ -297,6 +300,9 @@ class GRPCRecvQueue bool isWritable() const { return recv_queue.isWritable(); } + void registerPipeReadTask(TaskPtr && task) { recv_queue.registerPipeReadTask(std::move(task)); } + void registerPipeWriteTask(TaskPtr && task) { recv_queue.registerPipeWriteTask(std::move(task)); } + private: friend class tests::TestGRPCRecvQueue; diff --git a/dbms/src/Flash/Mpp/LocalRequestHandler.h b/dbms/src/Flash/Mpp/LocalRequestHandler.h index 20f52acc5b1..a6422d79880 100644 --- a/dbms/src/Flash/Mpp/LocalRequestHandler.h +++ b/dbms/src/Flash/Mpp/LocalRequestHandler.h @@ -42,6 +42,9 @@ struct LocalRequestHandler bool isWritable() const { return msg_queue->isWritable(); } + void registerPipeReadTask(TaskPtr && task) const { msg_queue->registerPipeReadTask(std::move(task)); } + void registerPipeWriteTask(TaskPtr && task) const { msg_queue->registerPipeWriteTask(std::move(task)); } + void writeDone(bool meet_error, const String & local_err_msg) const { notify_write_done(meet_error, local_err_msg); diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index 58062b7a170..231bcee32e4 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -92,10 +93,12 @@ enum class TunnelSenderMode /// TunnelSender is responsible for consuming data from Tunnel's internal send_queue and do the actual sending work /// After TunnelSend finished its work, either normally or abnormally, set ConsumerState to inform Tunnel -class TunnelSender : private boost::noncopyable +class TunnelSender + : private boost::noncopyable + , public NotifyFuture { public: - virtual ~TunnelSender() = default; + ~TunnelSender() override = default; TunnelSender( MemoryTrackerPtr & memory_tracker_, const LoggerPtr & log_, @@ -193,6 +196,8 @@ class SyncTunnelSender : public TunnelSender bool isWritable() const override { return send_queue.isWritable(); } + void registerTask(TaskPtr && task) override { send_queue.registerPipeWriteTask(std::move(task)); } + private: friend class tests::TestMPPTunnel; void sendJob(PacketWriter * writer); @@ -254,6 +259,8 @@ class AsyncTunnelSender : public TunnelSender void subDataSizeMetric(size_t size) { ::DB::MPPTunnelMetric::subDataSizeMetric(*data_size_in_queue, size); } + void registerTask(TaskPtr && task) override { queue.registerPipeWriteTask(std::move(task)); } + private: GRPCSendQueue queue; }; @@ -311,6 +318,17 @@ class LocalTunnelSenderV2 : public TunnelSender } } + void registerTask(TaskPtr && task) override + { + if constexpr (local_only) + local_request_handler.registerPipeWriteTask(std::move(task)); + else + { + std::lock_guard lock(mu); + local_request_handler.registerPipeWriteTask(std::move(task)); + } + } + private: friend class tests::TestMPPTunnel; @@ -405,6 +423,8 @@ class LocalTunnelSenderV1 : public TunnelSender bool isWritable() const override { return send_queue.isWritable(); } + void registerTask(TaskPtr && task) override { send_queue.registerPipeWriteTask(std::move(task)); } + private: bool cancel_reason_sent = false; LooseBoundedMPMCQueue send_queue; diff --git a/dbms/src/Flash/Mpp/ReceivedMessageQueue.h b/dbms/src/Flash/Mpp/ReceivedMessageQueue.h index 6fd41463d12..c975e4d2ab1 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessageQueue.h +++ b/dbms/src/Flash/Mpp/ReceivedMessageQueue.h @@ -99,6 +99,9 @@ class ReceivedMessageQueue bool isWritable() const { return grpc_recv_queue.isWritable(); } + void registerPipeReadTask(TaskPtr && task) { grpc_recv_queue.registerPipeReadTask(std::move(task)); } + void registerPipeWriteTask(TaskPtr && task) { grpc_recv_queue.registerPipeWriteTask(std::move(task)); } + #ifndef DBMS_PUBLIC_GTEST private: #endif From 62d3add9e182a604aa579004fe36bab39acef244 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 22 May 2024 18:31:09 +0800 Subject: [PATCH 02/25] add for writer --- .../src/Flash/Coprocessor/DAGResponseWriter.h | 3 ++ dbms/src/Flash/Coprocessor/StreamWriter.h | 7 +++- .../StreamingDAGResponseWriter.cpp | 6 +++ .../Coprocessor/StreamingDAGResponseWriter.h | 1 + dbms/src/Flash/Coprocessor/WaitResult.h | 25 ++++++++++++ .../tests/gtest_streaming_writer.cpp | 3 +- .../gtest_ti_remote_block_inputstream.cpp | 1 + .../Mpp/BroadcastOrPassThroughWriter.cpp | 6 +++ .../Flash/Mpp/BroadcastOrPassThroughWriter.h | 1 + .../Flash/Mpp/FineGrainedShuffleWriter.cpp | 6 +++ dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h | 1 + dbms/src/Flash/Mpp/HashPartitionWriter.cpp | 6 +++ dbms/src/Flash/Mpp/HashPartitionWriter.h | 1 + dbms/src/Flash/Mpp/MPPTunnel.cpp | 39 +++++++++++++++++++ dbms/src/Flash/Mpp/MPPTunnel.h | 3 ++ dbms/src/Flash/Mpp/MPPTunnelSet.cpp | 12 ++++++ dbms/src/Flash/Mpp/MPPTunnelSet.h | 2 + dbms/src/Flash/Mpp/MPPTunnelSetWriter.h | 7 ++++ .../Mpp/tests/gtest_mpp_exchange_writer.cpp | 3 +- 19 files changed, 129 insertions(+), 4 deletions(-) create mode 100644 dbms/src/Flash/Coprocessor/WaitResult.h diff --git a/dbms/src/Flash/Coprocessor/DAGResponseWriter.h b/dbms/src/Flash/Coprocessor/DAGResponseWriter.h index e4be0eb59b9..e8ac100a59e 100644 --- a/dbms/src/Flash/Coprocessor/DAGResponseWriter.h +++ b/dbms/src/Flash/Coprocessor/DAGResponseWriter.h @@ -17,6 +17,7 @@ #include #include #include +#include namespace DB { @@ -37,6 +38,8 @@ class DAGResponseWriter // ``` virtual bool isWritable() const { throw Exception("Unsupport"); } + virtual WaitResult waitForWritable() const { throw Exception("Unsupport"); } + /// flush cached blocks for batch writer virtual void flush() = 0; virtual ~DAGResponseWriter() = default; diff --git a/dbms/src/Flash/Coprocessor/StreamWriter.h b/dbms/src/Flash/Coprocessor/StreamWriter.h index 4eb62b27144..40061e2ed0f 100644 --- a/dbms/src/Flash/Coprocessor/StreamWriter.h +++ b/dbms/src/Flash/Coprocessor/StreamWriter.h @@ -16,6 +16,7 @@ #include #include +#include #ifdef __clang__ #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wdeprecated-declarations" @@ -57,7 +58,8 @@ struct CopStreamWriter if (!writer->Write(resp)) throw Exception("Failed to write resp"); } - bool isWritable() const { throw Exception("Unsupport async write"); } + static bool isWritable() { throw Exception("Unsupport async write"); } + static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } }; struct BatchCopStreamWriter @@ -81,7 +83,8 @@ struct BatchCopStreamWriter if (!writer->Write(resp)) throw Exception("Failed to write resp"); } - bool isWritable() const { throw Exception("Unsupport async write"); } + static bool isWritable() { throw Exception("Unsupport async write"); } + static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } }; using CopStreamWriterPtr = std::shared_ptr; diff --git a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp index b1169ca46bc..a2dc4e42f17 100644 --- a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp +++ b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp @@ -73,6 +73,12 @@ bool StreamingDAGResponseWriter::isWritable() const return writer->isWritable(); } +template +WaitResult StreamingDAGResponseWriter::waitForWritable() const +{ + return writer->waitForWritable(); +} + template void StreamingDAGResponseWriter::write(const Block & block) { diff --git a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h index e6f1e61b59e..56b355f5af7 100644 --- a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h +++ b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h @@ -38,6 +38,7 @@ class StreamingDAGResponseWriter : public DAGResponseWriter DAGContext & dag_context_); void write(const Block & block) override; bool isWritable() const override; + WaitResult waitForWritable() const override; void flush() override; private: diff --git a/dbms/src/Flash/Coprocessor/WaitResult.h b/dbms/src/Flash/Coprocessor/WaitResult.h new file mode 100644 index 00000000000..ed45e89111e --- /dev/null +++ b/dbms/src/Flash/Coprocessor/WaitResult.h @@ -0,0 +1,25 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +namespace DB +{ +enum class WaitResult +{ + Ready, + WaitForPolling, + WaitForNotify +}; +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp index c337f72de60..cf9b5ddee34 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp @@ -93,7 +93,8 @@ struct MockStreamWriter {} void write(tipb::SelectResponse & response) { checker(response); } - bool isWritable() const { throw Exception("Unsupport async write"); } + static bool isWritable() { throw Exception("Unsupport async write"); } + static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } private: MockStreamWriterChecker checker; diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp index 9448bd8477b..0211faa2c5b 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp @@ -148,6 +148,7 @@ struct MockWriter } static uint16_t getPartitionNum() { return 1; } static bool isWritable() { throw Exception("Unsupport async write"); } + static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } std::vector result_field_types; diff --git a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp index c2b42854214..06c7d20e0c5 100644 --- a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp +++ b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp @@ -75,6 +75,12 @@ bool BroadcastOrPassThroughWriter::isWritable() const return writer->isWritable(); } +template +WaitResult BroadcastOrPassThroughWriter::waitForWritable() const +{ + return writer->waitForWritable(); +} + template void BroadcastOrPassThroughWriter::write(const Block & block) { diff --git a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h index 9a28d13a461..e0a9d733e24 100644 --- a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h +++ b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h @@ -38,6 +38,7 @@ class BroadcastOrPassThroughWriter : public DAGResponseWriter tipb::ExchangeType exchange_type_); void write(const Block & block) override; bool isWritable() const override; + WaitResult waitForWritable() const override; void flush() override; private: diff --git a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp index cff3fe27e56..0893b7b77f1 100644 --- a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp +++ b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp @@ -101,6 +101,12 @@ bool FineGrainedShuffleWriter::isWritable() const return writer->isWritable(); } +template +WaitResult FineGrainedShuffleWriter::waitForWritable() const +{ + return writer->waitForWritable(); +} + template void FineGrainedShuffleWriter::write(const Block & block) { diff --git a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h index a2bf4cf40d6..6cf4fc3e326 100644 --- a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h +++ b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h @@ -46,6 +46,7 @@ class FineGrainedShuffleWriter : public DAGResponseWriter void prepare(const Block & sample_block) override; void write(const Block & block) override; bool isWritable() const override; + WaitResult waitForWritable() const override; void flush() override; private: diff --git a/dbms/src/Flash/Mpp/HashPartitionWriter.cpp b/dbms/src/Flash/Mpp/HashPartitionWriter.cpp index a46c5d3718a..eb976aed4c9 100644 --- a/dbms/src/Flash/Mpp/HashPartitionWriter.cpp +++ b/dbms/src/Flash/Mpp/HashPartitionWriter.cpp @@ -98,6 +98,12 @@ bool HashPartitionWriter::isWritable() const return writer->isWritable(); } +template +WaitResult HashPartitionWriter::waitForWritable() const +{ + return writer->waitForWritable(); +} + template void HashPartitionWriter::writeImplV1(const Block & block) { diff --git a/dbms/src/Flash/Mpp/HashPartitionWriter.h b/dbms/src/Flash/Mpp/HashPartitionWriter.h index b0d31b7b2e7..d11dd4deb9a 100644 --- a/dbms/src/Flash/Mpp/HashPartitionWriter.h +++ b/dbms/src/Flash/Mpp/HashPartitionWriter.h @@ -39,6 +39,7 @@ class HashPartitionWriter : public DAGResponseWriter tipb::CompressionMode compression_mode_); void write(const Block & block) override; bool isWritable() const override; + WaitResult waitForWritable() const override; void flush() override; private: diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index c0fe94efb52..3226e353f85 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -22,6 +22,7 @@ #include #include +#include "Flash/Pipeline/Schedule/Tasks/NotifyFuture.h" namespace DB { @@ -413,6 +414,44 @@ bool MPPTunnel::isWritable() const } } +WaitResult MPPTunnel::waitForWritable() const +{ + std::unique_lock lk(mu); + switch (status) + { + case TunnelStatus::Unconnected: + { + if (timeout.count() > 0) + { + fiu_do_on(FailPoints::random_tunnel_wait_timeout_failpoint, + throw Exception(fmt::format("{} is timeout", tunnel_id));); + if (unlikely(!timeout_stopwatch)) + timeout_stopwatch.emplace(CLOCK_MONOTONIC_COARSE); + if (unlikely(timeout_stopwatch->elapsed() > timeout_nanoseconds)) + throw Exception(fmt::format("{} is timeout", tunnel_id)); + } + return WaitResult::WaitForPolling; + } + case TunnelStatus::Connected: + case TunnelStatus::WaitingForSenderFinish: + RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id); + if (!tunnel_sender->isWritable()) + { + setNotifyFuture(tunnel_sender); + return WaitResult::WaitForNotify; + } + return WaitResult::Ready; + case TunnelStatus::Finished: + RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id); + throw Exception(fmt::format( + "write to tunnel {} which is already closed, {}", + tunnel_id, + tunnel_sender->isConsumerFinished() ? tunnel_sender->getConsumerFinishMsg() : "")); + default: + RUNTIME_ASSERT(false, log, "Unsupported tunnel status: {}", magic_enum::enum_name(status)); + } +} + std::string_view MPPTunnel::statusToString() { return magic_enum::enum_name(status); diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index 231bcee32e4..47d513c7812 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -500,6 +501,8 @@ class MPPTunnel : private boost::noncopyable void forceWrite(TrackedMppDataPacketPtr && data); bool isWritable() const; + WaitResult waitForWritable() const; + // finish the writing, and wait until the sender finishes. void writeDone(); diff --git a/dbms/src/Flash/Mpp/MPPTunnelSet.cpp b/dbms/src/Flash/Mpp/MPPTunnelSet.cpp index 6078ec8f2a4..a499444075d 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSet.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnelSet.cpp @@ -18,6 +18,7 @@ #include #include #include +#include "Flash/Mpp/MPPTunnel.h" namespace DB { @@ -78,6 +79,17 @@ bool MPPTunnelSetBase::isWritable() const return true; } +template +WaitResult MPPTunnelSetBase::waitForWritable() const +{ + for (const auto & tunnel : tunnels) + { + if (auto res = tunnel->waitForWritable(); res != WaitResult::Ready) + return res; + } + return WaitResult::Ready; +} + template void MPPTunnelSetBase::registerTunnel(const MPPTaskId & receiver_task_id, const TunnelPtr & tunnel) { diff --git a/dbms/src/Flash/Mpp/MPPTunnelSet.h b/dbms/src/Flash/Mpp/MPPTunnelSet.h index ee2c65768f7..606a1a0281b 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSet.h +++ b/dbms/src/Flash/Mpp/MPPTunnelSet.h @@ -62,6 +62,8 @@ class MPPTunnelSetBase : private boost::noncopyable bool isWritable() const; + WaitResult waitForWritable() const; + bool isLocal(size_t index) const; private: diff --git a/dbms/src/Flash/Mpp/MPPTunnelSetWriter.h b/dbms/src/Flash/Mpp/MPPTunnelSetWriter.h index ce923a88410..6bb06a0c190 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSetWriter.h +++ b/dbms/src/Flash/Mpp/MPPTunnelSetWriter.h @@ -71,6 +71,8 @@ class MPPTunnelSetWriterBase : private boost::noncopyable virtual bool isWritable() const = 0; + virtual WaitResult waitForWritable() const = 0; + protected: virtual void writeToTunnel(TrackedMppDataPacketPtr && data, size_t index) = 0; virtual void writeToTunnel(tipb::SelectResponse & response, size_t index) = 0; @@ -94,6 +96,9 @@ class SyncMPPTunnelSetWriter : public MPPTunnelSetWriterBase // For sync writer, `isWritable` will not be called, so an exception is thrown here. bool isWritable() const override { throw Exception("Unsupport sync writer"); } + // For sync writer, `waitForWritable` will not be called, so an exception is thrown here. + WaitResult waitForWritable() const override { throw Exception("Unsupport sync writer"); } + protected: void writeToTunnel(TrackedMppDataPacketPtr && data, size_t index) override; void writeToTunnel(tipb::SelectResponse & response, size_t index) override; @@ -112,6 +117,8 @@ class AsyncMPPTunnelSetWriter : public MPPTunnelSetWriterBase bool isWritable() const override { return mpp_tunnel_set->isWritable(); } + WaitResult waitForWritable() const override { return mpp_tunnel_set->waitForWritable(); } + protected: void writeToTunnel(TrackedMppDataPacketPtr && data, size_t index) override; void writeToTunnel(tipb::SelectResponse & response, size_t index) override; diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp index fd3b3593321..31520093a78 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp @@ -224,7 +224,8 @@ struct MockExchangeWriter // make only part 0 use local tunnel return index == 0; } - bool isWritable() const { throw Exception("Unsupport async write"); } + static bool isWritable() { throw Exception("Unsupport async write"); } + static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } private: MockExchangeWriterChecker checker; From e7d8c9be22c02ab2127651f7530d4c67f94b355a Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 22 May 2024 18:35:45 +0800 Subject: [PATCH 03/25] for exchange sender --- dbms/src/Operators/ExchangeSenderSinkOp.cpp | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/dbms/src/Operators/ExchangeSenderSinkOp.cpp b/dbms/src/Operators/ExchangeSenderSinkOp.cpp index 664e22e3cc5..4622e97d864 100644 --- a/dbms/src/Operators/ExchangeSenderSinkOp.cpp +++ b/dbms/src/Operators/ExchangeSenderSinkOp.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include "Flash/Coprocessor/WaitResult.h" namespace DB { @@ -41,12 +42,21 @@ OperatorStatus ExchangeSenderSinkOp::writeImpl(Block && block) OperatorStatus ExchangeSenderSinkOp::prepareImpl() { - return writer->isWritable() ? OperatorStatus::NEED_INPUT : OperatorStatus::WAITING; + return awaitImpl(); } OperatorStatus ExchangeSenderSinkOp::awaitImpl() { - return writer->isWritable() ? OperatorStatus::NEED_INPUT : OperatorStatus::WAITING; + auto res = writer->waitForWritable(); + switch (res) + { + case WaitResult::Ready: + return OperatorStatus::NEED_INPUT; + case WaitResult::WaitForPolling: + return OperatorStatus::WAITING; + case WaitResult::WaitForNotify: + return OperatorStatus::WAIT_FOR_NOTIFY; + } } } // namespace DB From 2ffee76722158090b5d3e698bdeda243903c2f36 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 22 May 2024 18:52:03 +0800 Subject: [PATCH 04/25] remove writable --- .../src/Flash/Coprocessor/DAGResponseWriter.h | 9 +++-- dbms/src/Flash/Coprocessor/StreamWriter.h | 2 -- .../StreamingDAGResponseWriter.cpp | 6 ---- .../Coprocessor/StreamingDAGResponseWriter.h | 1 - .../tests/gtest_streaming_writer.cpp | 1 - .../gtest_ti_remote_block_inputstream.cpp | 1 - .../Mpp/BroadcastOrPassThroughWriter.cpp | 6 ---- .../Flash/Mpp/BroadcastOrPassThroughWriter.h | 1 - .../Flash/Mpp/FineGrainedShuffleWriter.cpp | 6 ---- dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h | 1 - dbms/src/Flash/Mpp/HashPartitionWriter.cpp | 6 ---- dbms/src/Flash/Mpp/HashPartitionWriter.h | 1 - dbms/src/Flash/Mpp/MPPTunnel.cpp | 34 ------------------- dbms/src/Flash/Mpp/MPPTunnel.h | 9 +++-- dbms/src/Flash/Mpp/MPPTunnelSet.cpp | 12 ------- dbms/src/Flash/Mpp/MPPTunnelSet.h | 2 -- dbms/src/Flash/Mpp/MPPTunnelSetWriter.h | 7 ---- .../Mpp/tests/gtest_mpp_exchange_writer.cpp | 1 - dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp | 8 ++--- dbms/src/Operators/ExchangeSenderSinkOp.cpp | 18 ++++++---- dbms/src/Operators/ExchangeSenderSinkOp.h | 3 ++ 21 files changed, 26 insertions(+), 109 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGResponseWriter.h b/dbms/src/Flash/Coprocessor/DAGResponseWriter.h index e8ac100a59e..b9a756e365d 100644 --- a/dbms/src/Flash/Coprocessor/DAGResponseWriter.h +++ b/dbms/src/Flash/Coprocessor/DAGResponseWriter.h @@ -15,9 +15,9 @@ #pragma once #include +#include #include #include -#include namespace DB { @@ -31,13 +31,12 @@ class DAGResponseWriter virtual void prepare(const Block &){}; virtual void write(const Block & block) = 0; - // For async writer, `isWritable` need to be called before calling `write`. + // For async writer, `waitForWritable` need to be called before calling `write`. // ``` - // while (!isWritable()) {} + // auto res = waitForWritable(); + // switch (res) case... // write(block); // ``` - virtual bool isWritable() const { throw Exception("Unsupport"); } - virtual WaitResult waitForWritable() const { throw Exception("Unsupport"); } /// flush cached blocks for batch writer diff --git a/dbms/src/Flash/Coprocessor/StreamWriter.h b/dbms/src/Flash/Coprocessor/StreamWriter.h index 40061e2ed0f..41383ee49c2 100644 --- a/dbms/src/Flash/Coprocessor/StreamWriter.h +++ b/dbms/src/Flash/Coprocessor/StreamWriter.h @@ -58,7 +58,6 @@ struct CopStreamWriter if (!writer->Write(resp)) throw Exception("Failed to write resp"); } - static bool isWritable() { throw Exception("Unsupport async write"); } static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } }; @@ -83,7 +82,6 @@ struct BatchCopStreamWriter if (!writer->Write(resp)) throw Exception("Failed to write resp"); } - static bool isWritable() { throw Exception("Unsupport async write"); } static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } }; diff --git a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp index a2dc4e42f17..a6f39cb25dc 100644 --- a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp +++ b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp @@ -67,12 +67,6 @@ void StreamingDAGResponseWriter::flush() encodeThenWriteBlocks(); } -template -bool StreamingDAGResponseWriter::isWritable() const -{ - return writer->isWritable(); -} - template WaitResult StreamingDAGResponseWriter::waitForWritable() const { diff --git a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h index 56b355f5af7..61ca9a71517 100644 --- a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h +++ b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h @@ -37,7 +37,6 @@ class StreamingDAGResponseWriter : public DAGResponseWriter Int64 batch_send_min_limit_, DAGContext & dag_context_); void write(const Block & block) override; - bool isWritable() const override; WaitResult waitForWritable() const override; void flush() override; diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp index cf9b5ddee34..ed897c94fa0 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp @@ -93,7 +93,6 @@ struct MockStreamWriter {} void write(tipb::SelectResponse & response) { checker(response); } - static bool isWritable() { throw Exception("Unsupport async write"); } static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } private: diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp index 0211faa2c5b..7ff5d30129b 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp @@ -147,7 +147,6 @@ struct MockWriter queue->push(tracked_packet); } static uint16_t getPartitionNum() { return 1; } - static bool isWritable() { throw Exception("Unsupport async write"); } static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } std::vector result_field_types; diff --git a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp index 06c7d20e0c5..b4af6b40530 100644 --- a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp +++ b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp @@ -69,12 +69,6 @@ void BroadcastOrPassThroughWriter::flush() writeBlocks(); } -template -bool BroadcastOrPassThroughWriter::isWritable() const -{ - return writer->isWritable(); -} - template WaitResult BroadcastOrPassThroughWriter::waitForWritable() const { diff --git a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h index e0a9d733e24..be615c4c21c 100644 --- a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h +++ b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h @@ -37,7 +37,6 @@ class BroadcastOrPassThroughWriter : public DAGResponseWriter tipb::CompressionMode compression_mode_, tipb::ExchangeType exchange_type_); void write(const Block & block) override; - bool isWritable() const override; WaitResult waitForWritable() const override; void flush() override; diff --git a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp index 0893b7b77f1..200f60d4655 100644 --- a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp +++ b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp @@ -95,12 +95,6 @@ void FineGrainedShuffleWriter::flush() batchWriteFineGrainedShuffle(); } -template -bool FineGrainedShuffleWriter::isWritable() const -{ - return writer->isWritable(); -} - template WaitResult FineGrainedShuffleWriter::waitForWritable() const { diff --git a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h index 6cf4fc3e326..b050245cdca 100644 --- a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h +++ b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h @@ -45,7 +45,6 @@ class FineGrainedShuffleWriter : public DAGResponseWriter tipb::CompressionMode compression_mode_); void prepare(const Block & sample_block) override; void write(const Block & block) override; - bool isWritable() const override; WaitResult waitForWritable() const override; void flush() override; diff --git a/dbms/src/Flash/Mpp/HashPartitionWriter.cpp b/dbms/src/Flash/Mpp/HashPartitionWriter.cpp index eb976aed4c9..e567ab5ef89 100644 --- a/dbms/src/Flash/Mpp/HashPartitionWriter.cpp +++ b/dbms/src/Flash/Mpp/HashPartitionWriter.cpp @@ -92,12 +92,6 @@ void HashPartitionWriter::flush() } } -template -bool HashPartitionWriter::isWritable() const -{ - return writer->isWritable(); -} - template WaitResult HashPartitionWriter::waitForWritable() const { diff --git a/dbms/src/Flash/Mpp/HashPartitionWriter.h b/dbms/src/Flash/Mpp/HashPartitionWriter.h index d11dd4deb9a..8e36d28234d 100644 --- a/dbms/src/Flash/Mpp/HashPartitionWriter.h +++ b/dbms/src/Flash/Mpp/HashPartitionWriter.h @@ -38,7 +38,6 @@ class HashPartitionWriter : public DAGResponseWriter MPPDataPacketVersion data_codec_version_, tipb::CompressionMode compression_mode_); void write(const Block & block) override; - bool isWritable() const override; WaitResult waitForWritable() const override; void flush() override; diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 3226e353f85..9a16322d241 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -22,7 +22,6 @@ #include #include -#include "Flash/Pipeline/Schedule/Tasks/NotifyFuture.h" namespace DB { @@ -381,39 +380,6 @@ void MPPTunnel::waitUntilConnectedOrFinished(std::unique_lock & lk) throw Exception(fmt::format("MPPTunnel {} can not be connected because MPPTask is cancelled", tunnel_id)); } -bool MPPTunnel::isWritable() const -{ - std::unique_lock lk(mu); - switch (status) - { - case TunnelStatus::Unconnected: - { - if (timeout.count() > 0) - { - fiu_do_on(FailPoints::random_tunnel_wait_timeout_failpoint, - throw Exception(fmt::format("{} is timeout", tunnel_id));); - if (unlikely(!timeout_stopwatch)) - timeout_stopwatch.emplace(CLOCK_MONOTONIC_COARSE); - if (unlikely(timeout_stopwatch->elapsed() > timeout_nanoseconds)) - throw Exception(fmt::format("{} is timeout", tunnel_id)); - } - return false; - } - case TunnelStatus::Connected: - case TunnelStatus::WaitingForSenderFinish: - RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id); - return tunnel_sender->isWritable(); - case TunnelStatus::Finished: - RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id); - throw Exception(fmt::format( - "write to tunnel {} which is already closed, {}", - tunnel_id, - tunnel_sender->isConsumerFinished() ? tunnel_sender->getConsumerFinishMsg() : "")); - default: - RUNTIME_ASSERT(false, log, "Unsupported tunnel status: {}", magic_enum::enum_name(status)); - } -} - WaitResult MPPTunnel::waitForWritable() const { std::unique_lock lk(mu); diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index 47d513c7812..6eb57fd14b7 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -493,15 +493,14 @@ class MPPTunnel : private boost::noncopyable void write(TrackedMppDataPacketPtr && data); // forceWrite write a single packet to the tunnel's send queue without blocking, - // and need to call isReadForWrite first. + // and need to call waitForWritable first. // ``` - // while (!isWritable()) {} + // auto res = waitForWritable(); + // switch (res) case... // forceWrite(std::move(data)); // ``` - void forceWrite(TrackedMppDataPacketPtr && data); - bool isWritable() const; - WaitResult waitForWritable() const; + void forceWrite(TrackedMppDataPacketPtr && data); // finish the writing, and wait until the sender finishes. void writeDone(); diff --git a/dbms/src/Flash/Mpp/MPPTunnelSet.cpp b/dbms/src/Flash/Mpp/MPPTunnelSet.cpp index a499444075d..47f7fe2299c 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSet.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnelSet.cpp @@ -18,7 +18,6 @@ #include #include #include -#include "Flash/Mpp/MPPTunnel.h" namespace DB { @@ -68,17 +67,6 @@ void MPPTunnelSetBase::forceWrite(tipb::SelectResponse & response, size_ tunnels[index]->forceWrite(serializePacket(response)); } -template -bool MPPTunnelSetBase::isWritable() const -{ - for (const auto & tunnel : tunnels) - { - if (!tunnel->isWritable()) - return false; - } - return true; -} - template WaitResult MPPTunnelSetBase::waitForWritable() const { diff --git a/dbms/src/Flash/Mpp/MPPTunnelSet.h b/dbms/src/Flash/Mpp/MPPTunnelSet.h index 606a1a0281b..a57f57b3ac8 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSet.h +++ b/dbms/src/Flash/Mpp/MPPTunnelSet.h @@ -60,8 +60,6 @@ class MPPTunnelSetBase : private boost::noncopyable const std::vector & getTunnels() const { return tunnels; } - bool isWritable() const; - WaitResult waitForWritable() const; bool isLocal(size_t index) const; diff --git a/dbms/src/Flash/Mpp/MPPTunnelSetWriter.h b/dbms/src/Flash/Mpp/MPPTunnelSetWriter.h index 6bb06a0c190..1af6730f108 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSetWriter.h +++ b/dbms/src/Flash/Mpp/MPPTunnelSetWriter.h @@ -69,8 +69,6 @@ class MPPTunnelSetWriterBase : private boost::noncopyable uint16_t getPartitionNum() const { return mpp_tunnel_set->getPartitionNum(); } - virtual bool isWritable() const = 0; - virtual WaitResult waitForWritable() const = 0; protected: @@ -93,9 +91,6 @@ class SyncMPPTunnelSetWriter : public MPPTunnelSetWriterBase : MPPTunnelSetWriterBase(mpp_tunnel_set_, result_field_types_, req_id) {} - // For sync writer, `isWritable` will not be called, so an exception is thrown here. - bool isWritable() const override { throw Exception("Unsupport sync writer"); } - // For sync writer, `waitForWritable` will not be called, so an exception is thrown here. WaitResult waitForWritable() const override { throw Exception("Unsupport sync writer"); } @@ -115,8 +110,6 @@ class AsyncMPPTunnelSetWriter : public MPPTunnelSetWriterBase : MPPTunnelSetWriterBase(mpp_tunnel_set_, result_field_types_, req_id) {} - bool isWritable() const override { return mpp_tunnel_set->isWritable(); } - WaitResult waitForWritable() const override { return mpp_tunnel_set->waitForWritable(); } protected: diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp index 31520093a78..5953549827b 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp @@ -224,7 +224,6 @@ struct MockExchangeWriter // make only part 0 use local tunnel return index == 0; } - static bool isWritable() { throw Exception("Unsupport async write"); } static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } private: diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index 3d71f00cb0e..bf4a494651e 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -794,7 +794,7 @@ TEST_F(TestMPPTunnel, SyncTunnelForceWrite) mpp_tunnel_ptr->connectSync(writer_ptr.get()); GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); - ASSERT_TRUE(mpp_tunnel_ptr->isWritable()); + ASSERT_TRUE(mpp_tunnel_ptr->waitForWritable() == WaitResult::Ready); mpp_tunnel_ptr->forceWrite(newDataPacket("First")); mpp_tunnel_ptr->writeDone(); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); @@ -811,7 +811,7 @@ TEST_F(TestMPPTunnel, AsyncTunnelForceWrite) GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); std::thread t(&MockAsyncCallData::run, call_data.get()); - ASSERT_TRUE(mpp_tunnel_ptr->isWritable()); + ASSERT_TRUE(mpp_tunnel_ptr->waitForWritable() == WaitResult::Ready); mpp_tunnel_ptr->forceWrite(newDataPacket("First")); mpp_tunnel_ptr->writeDone(); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); @@ -828,7 +828,7 @@ TEST_F(TestMPPTunnel, LocalTunnelForceWrite) GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); std::thread t(&MockExchangeReceiver::receiveAll, receiver.get()); - ASSERT_TRUE(mpp_tunnel_ptr->isWritable()); + ASSERT_TRUE(mpp_tunnel_ptr->waitForWritable() == WaitResult::Ready); mpp_tunnel_ptr->forceWrite(newDataPacket("First")); mpp_tunnel_ptr->writeDone(); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); @@ -846,7 +846,7 @@ try Stopwatch stop_watch{CLOCK_MONOTONIC_COARSE}; while (stop_watch.elapsedSeconds() < 3 * timeout.count()) { - ASSERT_FALSE(mpp_tunnel_ptr->isWritable()); + ASSERT_FALSE(mpp_tunnel_ptr->waitForWritable() == WaitResult::Ready); } GTEST_FAIL(); } diff --git a/dbms/src/Operators/ExchangeSenderSinkOp.cpp b/dbms/src/Operators/ExchangeSenderSinkOp.cpp index 4622e97d864..1f073c67d20 100644 --- a/dbms/src/Operators/ExchangeSenderSinkOp.cpp +++ b/dbms/src/Operators/ExchangeSenderSinkOp.cpp @@ -13,7 +13,6 @@ // limitations under the License. #include -#include "Flash/Coprocessor/WaitResult.h" namespace DB { @@ -40,12 +39,7 @@ OperatorStatus ExchangeSenderSinkOp::writeImpl(Block && block) return OperatorStatus::NEED_INPUT; } -OperatorStatus ExchangeSenderSinkOp::prepareImpl() -{ - return awaitImpl(); -} - -OperatorStatus ExchangeSenderSinkOp::awaitImpl() +OperatorStatus ExchangeSenderSinkOp::waitForWriter() const { auto res = writer->waitForWritable(); switch (res) @@ -59,4 +53,14 @@ OperatorStatus ExchangeSenderSinkOp::awaitImpl() } } +OperatorStatus ExchangeSenderSinkOp::prepareImpl() +{ + return waitForWriter(); +} + +OperatorStatus ExchangeSenderSinkOp::awaitImpl() +{ + return waitForWriter(); +} + } // namespace DB diff --git a/dbms/src/Operators/ExchangeSenderSinkOp.h b/dbms/src/Operators/ExchangeSenderSinkOp.h index b4e4702d518..6a04c10faf2 100644 --- a/dbms/src/Operators/ExchangeSenderSinkOp.h +++ b/dbms/src/Operators/ExchangeSenderSinkOp.h @@ -43,6 +43,9 @@ class ExchangeSenderSinkOp : public SinkOp OperatorStatus awaitImpl() override; +private: + OperatorStatus waitForWriter() const; + private: std::unique_ptr writer; size_t total_rows = 0; From 563573bc94f04b4e30714f177a79670924c7dab7 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 22 May 2024 19:16:28 +0800 Subject: [PATCH 05/25] try fmt --- dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index bf4a494651e..bbea4210d36 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -31,7 +31,6 @@ #include #include - namespace DB { namespace tests From e1e920cb81b1d17af1443e5480d1c9e2bf5a1f55 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 22 May 2024 22:45:34 +0800 Subject: [PATCH 06/25] refine ut --- dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index bbea4210d36..e2fb701431f 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -793,7 +793,7 @@ TEST_F(TestMPPTunnel, SyncTunnelForceWrite) mpp_tunnel_ptr->connectSync(writer_ptr.get()); GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); - ASSERT_TRUE(mpp_tunnel_ptr->waitForWritable() == WaitResult::Ready); + GTEST_ASSERT_EQ(mpp_tunnel_ptr->waitForWritable(), WaitResult::Ready); mpp_tunnel_ptr->forceWrite(newDataPacket("First")); mpp_tunnel_ptr->writeDone(); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); @@ -810,7 +810,7 @@ TEST_F(TestMPPTunnel, AsyncTunnelForceWrite) GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); std::thread t(&MockAsyncCallData::run, call_data.get()); - ASSERT_TRUE(mpp_tunnel_ptr->waitForWritable() == WaitResult::Ready); + GTEST_ASSERT_EQ(mpp_tunnel_ptr->waitForWritable(), WaitResult::Ready); mpp_tunnel_ptr->forceWrite(newDataPacket("First")); mpp_tunnel_ptr->writeDone(); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); @@ -827,7 +827,7 @@ TEST_F(TestMPPTunnel, LocalTunnelForceWrite) GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); std::thread t(&MockExchangeReceiver::receiveAll, receiver.get()); - ASSERT_TRUE(mpp_tunnel_ptr->waitForWritable() == WaitResult::Ready); + GTEST_ASSERT_EQ(mpp_tunnel_ptr->waitForWritable(), WaitResult::Ready); mpp_tunnel_ptr->forceWrite(newDataPacket("First")); mpp_tunnel_ptr->writeDone(); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); @@ -845,7 +845,7 @@ try Stopwatch stop_watch{CLOCK_MONOTONIC_COARSE}; while (stop_watch.elapsedSeconds() < 3 * timeout.count()) { - ASSERT_FALSE(mpp_tunnel_ptr->waitForWritable() == WaitResult::Ready); + GTEST_ASSERT_EQ(mpp_tunnel_ptr->waitForWritable(), WaitResult::WaitForPolling); } GTEST_FAIL(); } From 2f258ae38747c04f487203990ff0abc3c8f662dc Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 23 May 2024 00:31:08 +0800 Subject: [PATCH 07/25] for receiver --- dbms/src/Flash/Mpp/LocalRequestHandler.h | 1 - dbms/src/Flash/Mpp/MPPTunnel.cpp | 2 +- dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp | 38 ++++-- dbms/src/Flash/Mpp/ReceivedMessageQueue.h | 71 +++++++++- .../Pipeline/Schedule/Tasks/NotifyFuture.cpp | 8 +- .../Pipeline/Schedule/Tasks/NotifyFuture.h | 5 +- .../Operators/ExchangeReceiverSourceOp.cpp | 123 +++++++++--------- dbms/src/Operators/ExchangeReceiverSourceOp.h | 9 +- dbms/src/Operators/SharedQueue.cpp | 4 +- dbms/src/Operators/UnorderedSourceOp.cpp | 2 +- 10 files changed, 166 insertions(+), 97 deletions(-) diff --git a/dbms/src/Flash/Mpp/LocalRequestHandler.h b/dbms/src/Flash/Mpp/LocalRequestHandler.h index a6422d79880..bf8d6e80bde 100644 --- a/dbms/src/Flash/Mpp/LocalRequestHandler.h +++ b/dbms/src/Flash/Mpp/LocalRequestHandler.h @@ -42,7 +42,6 @@ struct LocalRequestHandler bool isWritable() const { return msg_queue->isWritable(); } - void registerPipeReadTask(TaskPtr && task) const { msg_queue->registerPipeReadTask(std::move(task)); } void registerPipeWriteTask(TaskPtr && task) const { msg_queue->registerPipeWriteTask(std::move(task)); } void writeDone(bool meet_error, const String & local_err_msg) const diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 9a16322d241..be692c7d00d 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -403,7 +403,7 @@ WaitResult MPPTunnel::waitForWritable() const RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id); if (!tunnel_sender->isWritable()) { - setNotifyFuture(tunnel_sender); + setNotifyFuture(tunnel_sender.get()); return WaitResult::WaitForNotify; } return WaitResult::Ready; diff --git a/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp b/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp index 63096695eb8..40bd09c7863 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp +++ b/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp @@ -15,6 +15,8 @@ #include #include +#include "Flash/Pipeline/Schedule/Tasks/NotifyFuture.h" + namespace DB { namespace FailPoints @@ -104,7 +106,7 @@ ReceivedMessageQueue::ReceivedMessageQueue( : std::function([this](const ReceivedMessagePtr & element) { for (size_t i = 0; i < fine_grained_channel_size; ++i) { - auto result = msg_channels_for_fine_grained_shuffle[i]->forcePush(element); + auto result = msg_channels_for_fine_grained_shuffle[i].forcePush(element); RUNTIME_CHECK_MSG(result == MPMCQueueResult::OK, "push to fine grained channel must success"); } })) @@ -114,9 +116,7 @@ ReceivedMessageQueue::ReceivedMessageQueue( assert(fine_grained_channel_size > 0); msg_channels_for_fine_grained_shuffle.reserve(fine_grained_channel_size); for (size_t i = 0; i < fine_grained_channel_size; ++i) - /// these are unbounded queues - msg_channels_for_fine_grained_shuffle.push_back( - std::make_shared>(std::numeric_limits::max())); + msg_channels_for_fine_grained_shuffle.emplace_back(); } } @@ -127,9 +127,9 @@ MPMCQueueResult ReceivedMessageQueue::pop(size_t stream_id, ReceivedMessagePtr & if (fine_grained_channel_size > 0) { if constexpr (need_wait) - res = msg_channels_for_fine_grained_shuffle[stream_id]->pop(recv_msg); + res = msg_channels_for_fine_grained_shuffle[stream_id].pop(recv_msg); else - res = msg_channels_for_fine_grained_shuffle[stream_id]->tryPop(recv_msg); + res = msg_channels_for_fine_grained_shuffle[stream_id].tryPop(recv_msg); if (res == MPMCQueueResult::OK) { @@ -152,6 +152,15 @@ MPMCQueueResult ReceivedMessageQueue::pop(size_t stream_id, ReceivedMessagePtr & grpc_recv_queue.tryDequeue(); #endif } + ExchangeReceiverMetric::subDataSizeMetric(*data_size_in_queue, recv_msg->getPacket().ByteSizeLong()); + } + else + { + if constexpr (!need_wait) + { + if (res == MPMCQueueResult::EMPTY) + setNotifyFuture(&msg_channels_for_fine_grained_shuffle[stream_id]); + } } } else @@ -160,13 +169,20 @@ MPMCQueueResult ReceivedMessageQueue::pop(size_t stream_id, ReceivedMessagePtr & res = grpc_recv_queue.pop(recv_msg); else res = grpc_recv_queue.tryPop(recv_msg); - } - if (res == MPMCQueueResult::OK) - { - ExchangeReceiverMetric::subDataSizeMetric(*data_size_in_queue, recv_msg->getPacket().ByteSizeLong()); + if (res == MPMCQueueResult::OK) + { + ExchangeReceiverMetric::subDataSizeMetric(*data_size_in_queue, recv_msg->getPacket().ByteSizeLong()); + } + else + { + if constexpr (!need_wait) + { + if (res == MPMCQueueResult::EMPTY) + setNotifyFuture(&grpc_recv_queue); + } + } } - return res; } diff --git a/dbms/src/Flash/Mpp/ReceivedMessageQueue.h b/dbms/src/Flash/Mpp/ReceivedMessageQueue.h index c975e4d2ab1..81907a52f36 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessageQueue.h +++ b/dbms/src/Flash/Mpp/ReceivedMessageQueue.h @@ -23,6 +23,9 @@ #include #include +#include + +#include "Flash/Pipeline/Schedule/Tasks/NotifyFuture.h" namespace DB { @@ -55,6 +58,65 @@ enum class ReceiverMode Async }; +class GRPCNotifyQueue : public NotifyFuture +{ +public: + template + explicit GRPCNotifyQueue(const LoggerPtr & log_, Args &&... args) + : queue(log_, std::forward(args)...) + {} + + void registerTask(TaskPtr && task) override { queue.registerPipeReadTask(std::move(task)); } + + MPMCQueueResult pop(ReceivedMessagePtr & data) { return queue.pop(data); } + + MPMCQueueResult tryPop(ReceivedMessagePtr & data) { return queue.tryPop(data); } + + MPMCQueueResult forcePush(ReceivedMessagePtr && data) { return queue.forcePush(std::move(data)); } + + MPMCQueueResult push(ReceivedMessagePtr && data) { return queue.push(std::move(data)); } + + MPMCQueueResult tryDequeue() { return queue.tryDequeue(); } + + MPMCQueueResult pushWithTag(ReceivedMessagePtr && data, GRPCKickTag * new_tag) + { + return queue.pushWithTag(std::move(data), new_tag); + } + + void setKickFuncForTest(GRPCKickFunc && func) { queue.setKickFuncForTest(std::move(func)); } + + bool finish() { return queue.finish(); } + bool cancel() { return queue.cancel(); } + + bool isWritable() const { return queue.isWritable(); } + + void registerPipeWriteTask(TaskPtr && task) { queue.registerPipeWriteTask(std::move(task)); } + +private: + GRPCRecvQueue queue; +}; + +class MSGChannel : public NotifyFuture +{ +public: + void registerTask(TaskPtr && task) override { queue_ref.registerPipeReadTask(std::move(task)); } + + MPMCQueueResult pop(ReceivedMessagePtr & data) { return queue_ref.pop(data); } + + MPMCQueueResult tryPop(ReceivedMessagePtr & data) { return queue_ref.tryPop(data); } + + MPMCQueueResult forcePush(const ReceivedMessagePtr & data) { return queue_ref.forcePush(data); } + + bool finish() { return queue_ref.finish(); } + bool cancel() { return queue_ref.cancel(); } + +private: + using QueueImpl = LooseBoundedMPMCQueue; + // these are unbounded queues. + std::shared_ptr queue = std::make_shared(std::numeric_limits::max()); + QueueImpl & queue_ref = *queue; +}; + class ReceivedMessageQueue { public: @@ -86,7 +148,7 @@ class ReceivedMessageQueue grpc_recv_queue.finish(); /// msg_channels_for_fine_grained_shuffle must be finished after msg_channel is finished for (auto & channel : msg_channels_for_fine_grained_shuffle) - channel->finish(); + channel.finish(); } void cancel() @@ -94,12 +156,11 @@ class ReceivedMessageQueue grpc_recv_queue.cancel(); /// msg_channels_for_fine_grained_shuffle must be cancelled after msg_channel is cancelled for (auto & channel : msg_channels_for_fine_grained_shuffle) - channel->cancel(); + channel.cancel(); } bool isWritable() const { return grpc_recv_queue.isWritable(); } - void registerPipeReadTask(TaskPtr && task) { grpc_recv_queue.registerPipeReadTask(std::move(task)); } void registerPipeWriteTask(TaskPtr && task) { grpc_recv_queue.registerPipeWriteTask(std::move(task)); } #ifndef DBMS_PUBLIC_GTEST @@ -118,8 +179,8 @@ class ReceivedMessageQueue /// write: the writer first write the msg to msg_channel/grpc_recv_queue, if write success, then write msg to msg_channels_for_fine_grained_shuffle /// read: the reader read msg from msg_channels_for_fine_grained_shuffle, and reduce the `remaining_consumers` in msg, if `remaining_consumers` is 0, then /// remove the msg from msg_channel/grpc_recv_queue - std::vector>> msg_channels_for_fine_grained_shuffle; - GRPCRecvQueue grpc_recv_queue; + std::vector msg_channels_for_fine_grained_shuffle; + GRPCNotifyQueue grpc_recv_queue; }; } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.cpp b/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.cpp index b0a4fef9b05..587c20c2ff5 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.cpp @@ -16,9 +16,9 @@ namespace DB { -thread_local NotifyFuturePtr current_notify_future = nullptr; +thread_local NotifyFuture * current_notify_future = nullptr; -void setNotifyFuture(NotifyFuturePtr new_future) +void setNotifyFuture(NotifyFuture * new_future) { assert(current_notify_future == nullptr); current_notify_future = std::move(new_future); @@ -26,13 +26,13 @@ void setNotifyFuture(NotifyFuturePtr new_future) void clearNotifyFuture() { - current_notify_future.reset(); + current_notify_future = nullptr; } void registerTaskToFuture(TaskPtr && task) { assert(current_notify_future != nullptr); current_notify_future->registerTask(std::move(task)); - current_notify_future.reset(); + current_notify_future = nullptr; } } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.h index 5ba6c977f02..ca86dad7587 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.h @@ -24,11 +24,10 @@ struct NotifyFuture virtual ~NotifyFuture() = default; virtual void registerTask(TaskPtr && task) = 0; }; -using NotifyFuturePtr = std::shared_ptr; -extern thread_local NotifyFuturePtr current_notify_future; +extern thread_local NotifyFuture * current_notify_future; -void setNotifyFuture(NotifyFuturePtr new_future); +void setNotifyFuture(NotifyFuture * new_future); void clearNotifyFuture(); void registerTaskToFuture(TaskPtr && task); diff --git a/dbms/src/Operators/ExchangeReceiverSourceOp.cpp b/dbms/src/Operators/ExchangeReceiverSourceOp.cpp index 8e017fa497c..968d55f3425 100644 --- a/dbms/src/Operators/ExchangeReceiverSourceOp.cpp +++ b/dbms/src/Operators/ExchangeReceiverSourceOp.cpp @@ -29,6 +29,37 @@ Block ExchangeReceiverSourceOp::popFromBlockQueue() return block; } +void ExchangeReceiverSourceOp::recordDecodeDetail( + const DecodeDetail & decode_detail, + size_t index, + const String & req_info) +{ + auto & connection_profile_info = io_profile_info->connection_profile_infos[index]; + connection_profile_info.packets += decode_detail.packets; + connection_profile_info.bytes += decode_detail.packet_bytes; + total_rows += decode_detail.rows; + LOG_TRACE( + log, + "recv {} rows from exchange receiver for {}, total recv row num: {}", + decode_detail.rows, + req_info, + total_rows); +} + +void ExchangeReceiverSourceOp::handleError(const ExchangeReceiverResult & result) const +{ + if (result.meet_error) + { + LOG_WARNING(log, "exchange receiver meets error: {}", result.error_msg); + throw Exception(result.error_msg); + } + if (result.resp != nullptr && result.resp->has_error()) + { + LOG_WARNING(log, "exchange receiver meets error: {}", result.resp->error().DebugString()); + throw Exception(result.resp->error().DebugString()); + } +} + OperatorStatus ExchangeReceiverSourceOp::readImpl(Block & block) { if (!block_queue.empty()) @@ -40,77 +71,41 @@ OperatorStatus ExchangeReceiverSourceOp::readImpl(Block & block) while (true) { assert(block_queue.empty()); - auto await_status = awaitImpl(); - if (await_status == OperatorStatus::HAS_OUTPUT) + ReceivedMessagePtr recv_msg = nullptr; + auto receive_status = exchange_receiver->tryReceive(stream_id, recv_msg); + switch (receive_status) { - assert(receive_status != ReceiveStatus::empty); - auto result - = exchange_receiver - ->toExchangeReceiveResult(stream_id, receive_status, recv_msg, block_queue, header, decoder_ptr); - recv_msg = nullptr; - receive_status = ReceiveStatus::empty; - - if (result.meet_error) - { - LOG_WARNING(log, "exchange receiver meets error: {}", result.error_msg); - throw Exception(result.error_msg); - } - if (result.resp != nullptr && result.resp->has_error()) - { - LOG_WARNING(log, "exchange receiver meets error: {}", result.resp->error().DebugString()); - throw Exception(result.resp->error().DebugString()); - } - if (result.eof) - { - LOG_DEBUG(log, "exchange receiver meets eof"); - return OperatorStatus::HAS_OUTPUT; - } - - /// only the last response contains execution summaries - if (result.resp != nullptr) - io_profile_info->remote_execution_summary.add(*result.resp); - - const auto & decode_detail = result.decode_detail; - auto & connection_profile_info = io_profile_info->connection_profile_infos[result.call_index]; - connection_profile_info.packets += decode_detail.packets; - connection_profile_info.bytes += decode_detail.packet_bytes; - - total_rows += decode_detail.rows; - LOG_TRACE( - log, - "recv {} rows from exchange receiver for {}, total recv row num: {}", - decode_detail.rows, - result.req_info, - total_rows); + case ReceiveStatus::empty: + assert(!recv_msg); + assert(current_notify_future); + return OperatorStatus::WAIT_FOR_NOTIFY; + case ReceiveStatus::ok: + assert(recv_msg); + case ReceiveStatus::eof: + break; + } - if (decode_detail.rows <= 0) - continue; + auto result + = exchange_receiver + ->toExchangeReceiveResult(stream_id, receive_status, recv_msg, block_queue, header, decoder_ptr); - block = popFromBlockQueue(); + handleError(result); + if (result.eof) + { + LOG_DEBUG(log, "exchange receiver meets eof"); return OperatorStatus::HAS_OUTPUT; } - return await_status; - } -} -OperatorStatus ExchangeReceiverSourceOp::awaitImpl() -{ - if unlikely (!block_queue.empty()) - return OperatorStatus::HAS_OUTPUT; - if unlikely (receive_status != ReceiveStatus::empty) - return OperatorStatus::HAS_OUTPUT; + /// only the last response contains execution summaries + if (result.resp != nullptr) + io_profile_info->remote_execution_summary.add(*result.resp); - assert(!recv_msg); - receive_status = exchange_receiver->tryReceive(stream_id, recv_msg); - switch (receive_status) - { - case ReceiveStatus::ok: - assert(recv_msg); - return OperatorStatus::HAS_OUTPUT; - case ReceiveStatus::empty: - assert(!recv_msg); - return OperatorStatus::WAITING; - case ReceiveStatus::eof: + const auto & decode_detail = result.decode_detail; + recordDecodeDetail(result.decode_detail, result.call_index, result.req_info); + if (decode_detail.rows <= 0) + continue; + + block = popFromBlockQueue(); return OperatorStatus::HAS_OUTPUT; } } diff --git a/dbms/src/Operators/ExchangeReceiverSourceOp.h b/dbms/src/Operators/ExchangeReceiverSourceOp.h index f7f1e86c99f..9185f51bf0d 100644 --- a/dbms/src/Operators/ExchangeReceiverSourceOp.h +++ b/dbms/src/Operators/ExchangeReceiverSourceOp.h @@ -48,20 +48,19 @@ class ExchangeReceiverSourceOp : public SourceOp OperatorStatus readImpl(Block & block) override; - OperatorStatus awaitImpl() override; - private: Block popFromBlockQueue(); + void recordDecodeDetail(const DecodeDetail & decode_detail, size_t index, const String & req_info); + + void handleError(const ExchangeReceiverResult & result) const; + private: std::shared_ptr exchange_receiver; std::unique_ptr decoder_ptr; uint64_t total_rows{}; std::queue block_queue; - ReceivedMessagePtr recv_msg = nullptr; - ReceiveStatus receive_status = ReceiveStatus::empty; - size_t stream_id; IOProfileInfoPtr io_profile_info; diff --git a/dbms/src/Operators/SharedQueue.cpp b/dbms/src/Operators/SharedQueue.cpp index 230b465fea4..72b3069d173 100644 --- a/dbms/src/Operators/SharedQueue.cpp +++ b/dbms/src/Operators/SharedQueue.cpp @@ -89,7 +89,7 @@ OperatorStatus SharedQueueSinkOp::tryFlush() switch (queue_result) { case MPMCQueueResult::FULL: - setNotifyFuture(shared_queue); + setNotifyFuture(shared_queue.get()); return OperatorStatus::WAIT_FOR_NOTIFY; case MPMCQueueResult::OK: buffer.reset(); @@ -111,7 +111,7 @@ OperatorStatus SharedQueueSourceOp::readImpl(Block & block) switch (queue_result) { case MPMCQueueResult::EMPTY: - setNotifyFuture(shared_queue); + setNotifyFuture(shared_queue.get()); return OperatorStatus::WAIT_FOR_NOTIFY; case MPMCQueueResult::OK: return OperatorStatus::HAS_OUTPUT; diff --git a/dbms/src/Operators/UnorderedSourceOp.cpp b/dbms/src/Operators/UnorderedSourceOp.cpp index 074db28a43c..6b31ecf8040 100644 --- a/dbms/src/Operators/UnorderedSourceOp.cpp +++ b/dbms/src/Operators/UnorderedSourceOp.cpp @@ -45,7 +45,7 @@ OperatorStatus UnorderedSourceOp::readImpl(Block & block) { if (!task_pool->tryPopBlock(block)) { - setNotifyFuture(task_pool); + setNotifyFuture(task_pool.get()); return OperatorStatus::WAIT_FOR_NOTIFY; } From f8e1dc6e413c0d85ec94eee881cf09b0b274c51d Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 23 May 2024 00:33:16 +0800 Subject: [PATCH 08/25] u --- dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp | 2 -- dbms/src/Flash/Mpp/ReceivedMessageQueue.h | 2 -- 2 files changed, 4 deletions(-) diff --git a/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp b/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp index 40bd09c7863..19244e979cd 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp +++ b/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp @@ -15,8 +15,6 @@ #include #include -#include "Flash/Pipeline/Schedule/Tasks/NotifyFuture.h" - namespace DB { namespace FailPoints diff --git a/dbms/src/Flash/Mpp/ReceivedMessageQueue.h b/dbms/src/Flash/Mpp/ReceivedMessageQueue.h index 81907a52f36..3a85c760381 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessageQueue.h +++ b/dbms/src/Flash/Mpp/ReceivedMessageQueue.h @@ -25,8 +25,6 @@ #include #include -#include "Flash/Pipeline/Schedule/Tasks/NotifyFuture.h" - namespace DB { namespace ExchangeReceiverMetric From e2adf5e04ce7af7c42d399c9c13e5e654bf4665d Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 23 May 2024 00:36:08 +0800 Subject: [PATCH 09/25] fix --- dbms/src/Flash/Mpp/ReceivedMessageQueue.h | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Flash/Mpp/ReceivedMessageQueue.h b/dbms/src/Flash/Mpp/ReceivedMessageQueue.h index 3a85c760381..1da587e16a7 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessageQueue.h +++ b/dbms/src/Flash/Mpp/ReceivedMessageQueue.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include From c5655c667736725e2754e16b8488abfcc750c06c Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Thu, 23 May 2024 14:33:13 +0800 Subject: [PATCH 10/25] format Signed-off-by: Lloyd-Pottiger --- dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index e2fb701431f..ff11aef5ec1 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -750,8 +750,7 @@ try tunnel->write(std::move(packet)); } catch (...) - { - } + {} }; std::thread thd(tunnelRun, std::move(run_tunnel)); thd.join(); From da81c3c378c49683d237c7ef5e148188c2868467 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 23 May 2024 14:41:13 +0800 Subject: [PATCH 11/25] Update gtest_mpptunnel.cpp --- dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index e2fb701431f..ff11aef5ec1 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -750,8 +750,7 @@ try tunnel->write(std::move(packet)); } catch (...) - { - } + {} }; std::thread thd(tunnelRun, std::move(run_tunnel)); thd.join(); From 03dd7b6bab9b8dea3bfc5a22cc6389969c017bc4 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 27 May 2024 14:47:51 +0800 Subject: [PATCH 12/25] fix comment --- dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h index e47ce10f38c..1943b48cd0c 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h @@ -26,12 +26,12 @@ namespace DB * CANCELLED/ERROR/FINISHED * ▲ * │ - * ┌───────────────────────────────┐ - * │ ┌──►RUNNING◄──┐ │ - * INIT───►│ │ │ │ - * │ ▼ ▼ │ - * │ WATITING◄────────►IO_IN/OUT │ - * └───────────────────────────────┘ + * ┌───────────────────────────────────────────────┐ + * │ ┌──────────►RUNNING◄──────────┐ │ + * │ │ │ │ + * │ ▼ ▼ │ + * │ WATITING/WAIT_FOR_NOTIFY◄────────►IO_IN/OUT │ + * └───────────────────────────────────────────────┘ */ enum class ExecTaskStatus { From df2350696de3e3a8928884a3d774edef837812fb Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 5 Jun 2024 12:57:50 +0800 Subject: [PATCH 13/25] fix may hang issue --- dbms/src/Flash/Executor/PipelineExecutor.cpp | 1 + dbms/src/Flash/Executor/PipelineExecutorContext.cpp | 8 ++++++++ dbms/src/Flash/Executor/PipelineExecutorContext.h | 6 ++++++ 3 files changed, 15 insertions(+) diff --git a/dbms/src/Flash/Executor/PipelineExecutor.cpp b/dbms/src/Flash/Executor/PipelineExecutor.cpp index 602531fa49e..bb3f5833900 100644 --- a/dbms/src/Flash/Executor/PipelineExecutor.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutor.cpp @@ -34,6 +34,7 @@ PipelineExecutor::PipelineExecutor( // But for cop/batchCop, there is no such unique identifier, so an empty value is given here, indicating that the query id of PipelineExecutor is invalid. /*query_id=*/context.getDAGContext()->isMPPTask() ? context.getDAGContext()->getMPPTaskId().toString() : "", req_id, + context.getDAGContext(), memory_tracker_, auto_spill_trigger, register_operator_spill_context, diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp index 516fdc36e3c..72a5e7a2bcb 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include +#include #include #include @@ -153,6 +155,12 @@ void PipelineExecutorContext::cancel() bool origin_value = false; if (is_cancelled.compare_exchange_strong(origin_value, true, std::memory_order_release)) { + if (likely(dag_context)) + { + // Cancel the tunnel_set here to prevent pipeline tasks waiting in the WAIT_FOR_NOTIFY state from never being notified. + if (dag_context->tunnel_set) + dag_context->tunnel_set->close("", false); + } cancelSharedQueues(); if likely (TaskScheduler::instance && !query_id.empty()) TaskScheduler::instance->cancel(query_id, resource_group_name); diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.h b/dbms/src/Flash/Executor/PipelineExecutorContext.h index fca5291c4da..951d4465da8 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.h +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.h @@ -34,6 +34,8 @@ using RegisterOperatorSpillContext = std::function; +class DAGContext; + class PipelineExecutorContext : private boost::noncopyable { public: @@ -50,12 +52,14 @@ class PipelineExecutorContext : private boost::noncopyable PipelineExecutorContext( const String & query_id_, const String & req_id, + DAGContext * dag_context_, const MemoryTrackerPtr & mem_tracker_, AutoSpillTrigger * auto_spill_trigger_ = nullptr, const RegisterOperatorSpillContext & register_operator_spill_context_ = nullptr, const String & resource_group_name_ = "") : query_id(query_id_) , log(Logger::get(req_id)) + , dag_context(dag_context_) , mem_tracker(mem_tracker_) , auto_spill_trigger(auto_spill_trigger_) , register_operator_spill_context(register_operator_spill_context_) @@ -194,6 +198,8 @@ class PipelineExecutorContext : private boost::noncopyable LoggerPtr log; + DAGContext * dag_context{nullptr}; + MemoryTrackerPtr mem_tracker; std::mutex mu; From 0c43a11a70aa86e75c3e482f00d584615b1a44ae Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 5 Jun 2024 13:31:29 +0800 Subject: [PATCH 14/25] some fix --- dbms/src/Flash/Executor/PipelineExecutorContext.cpp | 2 +- dbms/src/Flash/Mpp/MPPTask.cpp | 6 +++--- dbms/src/Flash/Mpp/MPPTunnel.h | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp index 72a5e7a2bcb..a949010d505 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp @@ -159,7 +159,7 @@ void PipelineExecutorContext::cancel() { // Cancel the tunnel_set here to prevent pipeline tasks waiting in the WAIT_FOR_NOTIFY state from never being notified. if (dag_context->tunnel_set) - dag_context->tunnel_set->close("", false); + dag_context->tunnel_set->close(getExceptionMsg(), false); } cancelSharedQueues(); if likely (TaskScheduler::instance && !query_id.empty()) diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 75c5c221ed4..2bac7bcd115 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -759,13 +759,13 @@ void MPPTask::abort(const String & message, AbortType abort_type) } else if (previous_status == RUNNING && switchStatus(RUNNING, next_task_status)) { - /// abort the components from top to bottom because if bottom components are aborted - /// first, the top components may see an error caused by the abort, which is not + /// abort mpptunnels first because if others components are aborted + /// first, the mpptunnels may see an error caused by the abort, which is not /// the original error setErrString(message); abortTunnels(message, false); - abortQueryExecutor(); abortReceivers(); + abortQueryExecutor(); scheduleThisTask(ScheduleState::FAILED); /// runImpl is running, leave remaining work to runImpl LOG_WARNING(log, "Finish abort task from running"); diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index 6eb57fd14b7..4c2421437e4 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -336,7 +336,7 @@ class LocalTunnelSenderV2 : public TunnelSender template bool pushImpl(TrackedMppDataPacketPtr && data) { - if (unlikely(checkPacketErr(data))) + if (unlikely(is_done || checkPacketErr(data))) return false; // When ExchangeReceiver receives data from local and remote tiflash, number of local tunnel threads From 94d954b696a1edee74b7839a583d2d2d60283f25 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 5 Jun 2024 13:48:50 +0800 Subject: [PATCH 15/25] fix ut --- dbms/src/Flash/Executor/PipelineExecutor.cpp | 2 +- dbms/src/Flash/Executor/PipelineExecutorContext.h | 8 ++++---- .../TaskQueues/tests/gtest_resource_control_queue.cpp | 6 +++++- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/dbms/src/Flash/Executor/PipelineExecutor.cpp b/dbms/src/Flash/Executor/PipelineExecutor.cpp index bb3f5833900..b7c86b27ef2 100644 --- a/dbms/src/Flash/Executor/PipelineExecutor.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutor.cpp @@ -34,8 +34,8 @@ PipelineExecutor::PipelineExecutor( // But for cop/batchCop, there is no such unique identifier, so an empty value is given here, indicating that the query id of PipelineExecutor is invalid. /*query_id=*/context.getDAGContext()->isMPPTask() ? context.getDAGContext()->getMPPTaskId().toString() : "", req_id, - context.getDAGContext(), memory_tracker_, + context.getDAGContext(), auto_spill_trigger, register_operator_spill_context, context.getDAGContext()->getResourceGroupName()) diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.h b/dbms/src/Flash/Executor/PipelineExecutorContext.h index 951d4465da8..3181d6fe6aa 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.h +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.h @@ -52,15 +52,15 @@ class PipelineExecutorContext : private boost::noncopyable PipelineExecutorContext( const String & query_id_, const String & req_id, - DAGContext * dag_context_, const MemoryTrackerPtr & mem_tracker_, + DAGContext * dag_context_ = nullptr, AutoSpillTrigger * auto_spill_trigger_ = nullptr, const RegisterOperatorSpillContext & register_operator_spill_context_ = nullptr, const String & resource_group_name_ = "") : query_id(query_id_) , log(Logger::get(req_id)) - , dag_context(dag_context_) , mem_tracker(mem_tracker_) + , dag_context(dag_context_) , auto_spill_trigger(auto_spill_trigger_) , register_operator_spill_context(register_operator_spill_context_) , resource_group_name(resource_group_name_) @@ -198,10 +198,10 @@ class PipelineExecutorContext : private boost::noncopyable LoggerPtr log; - DAGContext * dag_context{nullptr}; - MemoryTrackerPtr mem_tracker; + DAGContext * dag_context{nullptr}; + std::mutex mu; std::condition_variable cv; std::exception_ptr exception_ptr; diff --git a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/tests/gtest_resource_control_queue.cpp b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/tests/gtest_resource_control_queue.cpp index 5003117c91c..76565dac6ea 100644 --- a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/tests/gtest_resource_control_queue.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/tests/gtest_resource_control_queue.cpp @@ -176,6 +176,7 @@ class TestResourceControlQueue : public ::testing::Test mem_tracker, nullptr, nullptr, + nullptr, resource_group_name); } return all_contexts; @@ -378,6 +379,7 @@ TEST_F(TestResourceControlQueue, BasicTest) mem_tracker, nullptr, nullptr, + nullptr, group_name); for (int j = 0; j < task_num_per_resource_group; ++j) @@ -413,6 +415,7 @@ TEST_F(TestResourceControlQueue, BasicTimeoutTest) mem_tracker, nullptr, nullptr, + nullptr, group_name); auto task = std::make_unique(*exec_context); @@ -441,7 +444,8 @@ TEST_F(TestResourceControlQueue, RunOutOfRU) TaskSchedulerConfig config{thread_num, thread_num}; TaskScheduler task_scheduler(config); - PipelineExecutorContext exec_context("mock-query-id", "mock-req-id", mem_tracker, nullptr, nullptr, rg_name); + PipelineExecutorContext + exec_context("mock-query-id", "mock-req-id", mem_tracker, nullptr, nullptr, nullptr, rg_name); auto task = std::make_unique(exec_context); // This task should use 5*100ms cpu_time. From dd1316fb66753eb636ab216f8d2fae17ec618c49 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 5 Jun 2024 14:00:11 +0800 Subject: [PATCH 16/25] refine err msg --- .../Executor/PipelineExecutorContext.cpp | 23 +++++++++++++++++-- .../Flash/Executor/PipelineExecutorContext.h | 2 ++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp index a949010d505..a3266cec721 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -53,6 +54,24 @@ String PipelineExecutorContext::getExceptionMsg() } } +String PipelineExecutorContext::getTrimmedErrMsg() +{ + try + { + auto cur_exception_ptr = getExceptionPtr(); + if (!cur_exception_ptr) + return ""; + std::rethrow_exception(cur_exception_ptr); + } + catch (...) + { + auto err_msg = getCurrentExceptionMessage(true, true); + if (likely(!err_msg.empty())) + trimStackTrace(err_msg); + return err_msg; + } +} + void PipelineExecutorContext::onErrorOccurred(const String & err_msg) { DB::Exception e(err_msg); @@ -155,13 +174,13 @@ void PipelineExecutorContext::cancel() bool origin_value = false; if (is_cancelled.compare_exchange_strong(origin_value, true, std::memory_order_release)) { + cancelSharedQueues(); if (likely(dag_context)) { // Cancel the tunnel_set here to prevent pipeline tasks waiting in the WAIT_FOR_NOTIFY state from never being notified. if (dag_context->tunnel_set) - dag_context->tunnel_set->close(getExceptionMsg(), false); + dag_context->tunnel_set->close(getTrimmedErrMsg(), false); } - cancelSharedQueues(); if likely (TaskScheduler::instance && !query_id.empty()) TaskScheduler::instance->cancel(query_id, resource_group_name); } diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.h b/dbms/src/Flash/Executor/PipelineExecutorContext.h index 3181d6fe6aa..b0f8d6b0946 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.h +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.h @@ -186,6 +186,8 @@ class PipelineExecutorContext : private boost::noncopyable private: bool setExceptionPtr(const std::exception_ptr & exception_ptr_); + String getTrimmedErrMsg(); + // Need to be called under lock. bool isWaitMode(); From 902291bdfdc74ead62eac47ff0154de23239e115 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 6 Jun 2024 01:23:02 +0800 Subject: [PATCH 17/25] add cancel --- dbms/src/Flash/Coprocessor/DAGContext.h | 3 ++- dbms/src/Flash/Executor/PipelineExecutorContext.cpp | 6 +++++- dbms/src/Flash/Mpp/ReceivedMessageQueue.h | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index 50776d4061a..bb9d2ccc610 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -374,6 +374,8 @@ class DAGContext /* const */ bool is_disaggregated_task = false; // a disagg task handling by the write node // `tunnel_set` is always set by `MPPTask` and is used later. MPPTunnelSetPtr tunnel_set; + // `mpp_receiver_set` is always set by `MPPTask` and is used later. + MPPReceiverSetPtr mpp_receiver_set; TablesRegionsInfo tables_regions_info; // part of regions_for_local_read + regions_for_remote_read, only used for batch-cop RegionInfoList retry_regions; @@ -443,7 +445,6 @@ class DAGContext /// warning_count is the actual warning count during the entire execution std::atomic warning_count; - MPPReceiverSetPtr mpp_receiver_set; std::vector coprocessor_readers; /// vector of SubqueriesForSets(such as join build subquery). /// The order of the vector is also the order of the subquery. diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp index a3266cec721..deb7e43bdb8 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -177,9 +178,12 @@ void PipelineExecutorContext::cancel() cancelSharedQueues(); if (likely(dag_context)) { - // Cancel the tunnel_set here to prevent pipeline tasks waiting in the WAIT_FOR_NOTIFY state from never being notified. + // Cancel the tunnel_set and mpp_receiver_set here to prevent + // pipeline tasks waiting in the WAIT_FOR_NOTIFY state from never being notified. if (dag_context->tunnel_set) dag_context->tunnel_set->close(getTrimmedErrMsg(), false); + if (dag_context->mpp_receiver_set) + dag_context->mpp_receiver_set->cancel(); } if likely (TaskScheduler::instance && !query_id.empty()) TaskScheduler::instance->cancel(query_id, resource_group_name); diff --git a/dbms/src/Flash/Mpp/ReceivedMessageQueue.h b/dbms/src/Flash/Mpp/ReceivedMessageQueue.h index 1da587e16a7..679625bc172 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessageQueue.h +++ b/dbms/src/Flash/Mpp/ReceivedMessageQueue.h @@ -112,7 +112,7 @@ class MSGChannel : public NotifyFuture private: using QueueImpl = LooseBoundedMPMCQueue; // these are unbounded queues. - std::shared_ptr queue = std::make_shared(std::numeric_limits::max()); + std::unique_ptr queue = std::make_unique(std::numeric_limits::max()); QueueImpl & queue_ref = *queue; }; From 7f878ceebf93c296be2e8c77a3ecd0f9333f45c8 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 6 Jun 2024 15:28:48 +0800 Subject: [PATCH 18/25] u --- .../Flash/Pipeline/Schedule/Tasks/Impls/StreamRestoreTask.cpp | 2 +- dbms/src/Operators/GetResultSinkOp.cpp | 2 +- dbms/src/Operators/HashProbeTransformExec.cpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/StreamRestoreTask.cpp b/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/StreamRestoreTask.cpp index a08a1a9aff6..105435896f4 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/StreamRestoreTask.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/StreamRestoreTask.cpp @@ -48,7 +48,7 @@ ExecTaskStatus StreamRestoreTask::tryFlush() t_block.clear(); return ExecTaskStatus::IO_IN; case MPMCQueueResult::FULL: - setNotifyFuture(sink); + setNotifyFuture(sink.get()); return ExecTaskStatus::WAIT_FOR_NOTIFY; case MPMCQueueResult::CANCELLED: return ExecTaskStatus::CANCELLED; diff --git a/dbms/src/Operators/GetResultSinkOp.cpp b/dbms/src/Operators/GetResultSinkOp.cpp index bc904676b6b..db52776aec7 100644 --- a/dbms/src/Operators/GetResultSinkOp.cpp +++ b/dbms/src/Operators/GetResultSinkOp.cpp @@ -38,7 +38,7 @@ OperatorStatus GetResultSinkOp::tryFlush() switch (queue_result) { case MPMCQueueResult::FULL: - setNotifyFuture(result_queue); + setNotifyFuture(result_queue.get()); return OperatorStatus::WAIT_FOR_NOTIFY; case MPMCQueueResult::OK: t_block.reset(); diff --git a/dbms/src/Operators/HashProbeTransformExec.cpp b/dbms/src/Operators/HashProbeTransformExec.cpp index 7c0a1e375c7..994323bdb44 100644 --- a/dbms/src/Operators/HashProbeTransformExec.cpp +++ b/dbms/src/Operators/HashProbeTransformExec.cpp @@ -129,7 +129,7 @@ bool HashProbeTransformExec::prepareProbeRestoredBlock() case MPMCQueueResult::OK: return true; case MPMCQueueResult::EMPTY: - setNotifyFuture(probe_source_holder); + setNotifyFuture(probe_source_holder.get()); return false; case MPMCQueueResult::FINISHED: case MPMCQueueResult::CANCELLED: From bd665119cdcf3ef249ef2ea1ff89d2fc7b0be897 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 4 Sep 2024 10:51:44 +0800 Subject: [PATCH 19/25] Update ReceivedMessageQueue.cpp --- dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp b/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp index 19244e979cd..656d58a7b66 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp +++ b/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp @@ -149,8 +149,8 @@ MPMCQueueResult ReceivedMessageQueue::pop(size_t stream_id, ReceivedMessagePtr & #else grpc_recv_queue.tryDequeue(); #endif + ExchangeReceiverMetric::subDataSizeMetric(*data_size_in_queue, recv_msg->getPacket().ByteSizeLong()); } - ExchangeReceiverMetric::subDataSizeMetric(*data_size_in_queue, recv_msg->getPacket().ByteSizeLong()); } else { From d13644d9b988c2df25da0ef66220475dc1729692 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 18 Sep 2024 16:39:27 +0800 Subject: [PATCH 20/25] Update DAGContext.h --- dbms/src/Flash/Coprocessor/DAGContext.h | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index a3033010db3..c52b0d63e23 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -351,6 +351,8 @@ class DAGContext UInt64 getConnectionID() const { return connection_id; } const String & getConnectionAlias() const { return connection_alias; } + MPPReceiverSetPtr getMPPReceiverSet() const { return mpp_receiver_set; } + public: DAGRequest dag_request; /// Some existing code inherited from Clickhouse assume that each query must have a valid query string and query ast, @@ -374,8 +376,6 @@ class DAGContext /* const */ bool is_disaggregated_task = false; // a disagg task handling by the write node // `tunnel_set` is always set by `MPPTask` and is used later. MPPTunnelSetPtr tunnel_set; - // `mpp_receiver_set` is always set by `MPPTask` and is used later. - MPPReceiverSetPtr mpp_receiver_set; TablesRegionsInfo tables_regions_info; // part of regions_for_local_read + regions_for_remote_read, only used for batch-cop RegionInfoList retry_regions; @@ -445,6 +445,8 @@ class DAGContext /// warning_count is the actual warning count during the entire execution std::atomic warning_count; + // `mpp_receiver_set` is always set by `MPPTask` and is used later. + MPPReceiverSetPtr mpp_receiver_set; std::vector coprocessor_readers; /// vector of SubqueriesForSets(such as join build subquery). /// The order of the vector is also the order of the subquery. From 1f2423f85d17034e10922b92d0b2ba6508eeaf36 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 18 Sep 2024 16:41:01 +0800 Subject: [PATCH 21/25] Update PipelineExecutorContext.cpp --- dbms/src/Flash/Executor/PipelineExecutorContext.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp index a3a1b66fad6..c4405662931 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp @@ -183,8 +183,8 @@ void PipelineExecutorContext::cancel() // pipeline tasks waiting in the WAIT_FOR_NOTIFY state from never being notified. if (dag_context->tunnel_set) dag_context->tunnel_set->close(getTrimmedErrMsg(), false); - if (dag_context->mpp_receiver_set) - dag_context->mpp_receiver_set->cancel(); + if (auto mpp_receiver_set = dag_context->getMPPReceiverSet(); mpp_receiver_set) + mpp_receiver_set->cancel(); } cancelResultQueueIfNeed(); if likely (TaskScheduler::instance && !query_id.empty()) From c6033a8a2216d59a9d187d877de8c72eaca171ff Mon Sep 17 00:00:00 2001 From: gengliqi Date: Fri, 27 Sep 2024 18:18:04 +0800 Subject: [PATCH 22/25] update Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/ReceivedMessage.cpp | 7 +- dbms/src/Flash/Mpp/ReceivedMessage.h | 5 +- dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp | 14 ++-- dbms/src/Flash/Mpp/ReceivedMessageQueue.h | 68 +++++-------------- dbms/src/Interpreters/Join.cpp | 4 +- .../src/Operators/SharedAggregateRestorer.cpp | 2 +- 6 files changed, 34 insertions(+), 66 deletions(-) diff --git a/dbms/src/Flash/Mpp/ReceivedMessage.cpp b/dbms/src/Flash/Mpp/ReceivedMessage.cpp index 732e603616a..b95ca0e472d 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessage.cpp +++ b/dbms/src/Flash/Mpp/ReceivedMessage.cpp @@ -18,7 +18,7 @@ namespace DB { const std::vector & ReceivedMessage::getChunks(size_t stream_id) const { - if (remaining_consumers != nullptr) + if (fine_grained_consumer_size > 0) return fine_grained_chunks[stream_id]; else return chunks; @@ -31,17 +31,18 @@ ReceivedMessage::ReceivedMessage( const mpp::Error * error_ptr_, const String * resp_ptr_, std::vector && chunks_, - size_t fine_grained_consumer_size) + size_t fine_grained_consumer_size_) : source_index(source_index_) , req_info(req_info_) , packet(packet_) , error_ptr(error_ptr_) , resp_ptr(resp_ptr_) , chunks(chunks_) + , fine_grained_consumer_size(fine_grained_consumer_size_) + , remaining_consumers(fine_grained_consumer_size_) { if (fine_grained_consumer_size > 0) { - remaining_consumers = std::make_shared>(fine_grained_consumer_size); fine_grained_chunks.resize(fine_grained_consumer_size); if (packet->packet.chunks_size() > 0) { diff --git a/dbms/src/Flash/Mpp/ReceivedMessage.h b/dbms/src/Flash/Mpp/ReceivedMessage.h index a94531de0b8..25d4d80328a 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessage.h +++ b/dbms/src/Flash/Mpp/ReceivedMessage.h @@ -33,7 +33,8 @@ class ReceivedMessage std::vector chunks; /// used for fine grained shuffle, remaining_consumers will be nullptr for non fine grained shuffle std::vector> fine_grained_chunks; - std::shared_ptr> remaining_consumers; + std::atomic remaining_consumers; + bool fine_grained_consumer_size; public: // Constructor that move chunks. @@ -50,7 +51,7 @@ class ReceivedMessage const String & getReqInfo() const { return req_info; } const mpp::Error * getErrorPtr() const { return error_ptr; } const String * getRespPtr(size_t stream_id) const { return stream_id == 0 ? resp_ptr : nullptr; } - std::shared_ptr> & getRemainingConsumers() { return remaining_consumers; } + std::atomic & getRemainingConsumers() { return remaining_consumers; } const std::vector & getChunks(size_t stream_id) const; const mpp::MPPDataPacket & getPacket() const { return packet->packet; } bool containUsefulMessage() const; diff --git a/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp b/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp index 656d58a7b66..bbff335d09c 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp +++ b/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp @@ -104,7 +104,7 @@ ReceivedMessageQueue::ReceivedMessageQueue( : std::function([this](const ReceivedMessagePtr & element) { for (size_t i = 0; i < fine_grained_channel_size; ++i) { - auto result = msg_channels_for_fine_grained_shuffle[i].forcePush(element); + auto result = msg_channels_for_fine_grained_shuffle[i]->forcePush(element); RUNTIME_CHECK_MSG(result == MPMCQueueResult::OK, "push to fine grained channel must success"); } })) @@ -125,13 +125,13 @@ MPMCQueueResult ReceivedMessageQueue::pop(size_t stream_id, ReceivedMessagePtr & if (fine_grained_channel_size > 0) { if constexpr (need_wait) - res = msg_channels_for_fine_grained_shuffle[stream_id].pop(recv_msg); + res = msg_channels_for_fine_grained_shuffle[stream_id]->pop(recv_msg); else - res = msg_channels_for_fine_grained_shuffle[stream_id].tryPop(recv_msg); + res = msg_channels_for_fine_grained_shuffle[stream_id]->tryPop(recv_msg); if (res == MPMCQueueResult::OK) { - if (recv_msg->getRemainingConsumers()->fetch_sub(1) == 1) + if (recv_msg->getRemainingConsumers().fetch_sub(1) == 1) { #ifndef NDEBUG ReceivedMessagePtr original_msg; @@ -143,9 +143,9 @@ MPMCQueueResult ReceivedMessageQueue::pop(size_t stream_id, ReceivedMessagePtr & "The result of 'grpc_recv_queue->tryPop' is definitely not EMPTY."); if likely (original_msg != nullptr) RUNTIME_CHECK_MSG( - *original_msg->getRemainingConsumers() == 0, + original_msg->getRemainingConsumers() == 0, "Fine grained receiver pop a message that is not full consumed, remaining consumer: {}", - *original_msg->getRemainingConsumers()); + original_msg->getRemainingConsumers()); #else grpc_recv_queue.tryDequeue(); #endif @@ -157,7 +157,7 @@ MPMCQueueResult ReceivedMessageQueue::pop(size_t stream_id, ReceivedMessagePtr & if constexpr (!need_wait) { if (res == MPMCQueueResult::EMPTY) - setNotifyFuture(&msg_channels_for_fine_grained_shuffle[stream_id]); + setNotifyFuture(msg_channels_for_fine_grained_shuffle[stream_id].get()); } } } diff --git a/dbms/src/Flash/Mpp/ReceivedMessageQueue.h b/dbms/src/Flash/Mpp/ReceivedMessageQueue.h index 29097d56b45..3d235a294b2 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessageQueue.h +++ b/dbms/src/Flash/Mpp/ReceivedMessageQueue.h @@ -57,63 +57,29 @@ enum class ReceiverMode Async }; -class GRPCNotifyQueue : public NotifyFuture +class GRPCNotifyRecvQueue final + : public NotifyFuture + , public GRPCRecvQueue { public: template - explicit GRPCNotifyQueue(const LoggerPtr & log_, Args &&... args) - : queue(log_, std::forward(args)...) + explicit GRPCNotifyRecvQueue(const LoggerPtr & log_, Args &&... args) + : GRPCRecvQueue(log_, std::forward(args)...) {} - void registerTask(TaskPtr && task) override { queue.registerPipeReadTask(std::move(task)); } - - MPMCQueueResult pop(ReceivedMessagePtr & data) { return queue.pop(data); } - - MPMCQueueResult tryPop(ReceivedMessagePtr & data) { return queue.tryPop(data); } - - MPMCQueueResult forcePush(ReceivedMessagePtr && data) { return queue.forcePush(std::move(data)); } - - MPMCQueueResult push(ReceivedMessagePtr && data) { return queue.push(std::move(data)); } - - MPMCQueueResult tryDequeue() { return queue.tryDequeue(); } - - MPMCQueueResult pushWithTag(ReceivedMessagePtr && data, GRPCKickTag * new_tag) - { - return queue.pushWithTag(std::move(data), new_tag); - } - - void setKickFuncForTest(GRPCKickFunc && func) { queue.setKickFuncForTest(std::move(func)); } - - bool finish() { return queue.finish(); } - bool cancel() { return queue.cancel(); } - - bool isWritable() const { return queue.isWritable(); } - - void registerPipeWriteTask(TaskPtr && task) { queue.registerPipeWriteTask(std::move(task)); } - -private: - GRPCRecvQueue queue; + void registerTask(TaskPtr && task) override { registerPipeReadTask(std::move(task)); } }; -class MSGChannel : public NotifyFuture +class MSGChannel final + : public NotifyFuture + , public LooseBoundedMPMCQueue { public: - void registerTask(TaskPtr && task) override { queue_ref.registerPipeReadTask(std::move(task)); } - - MPMCQueueResult pop(ReceivedMessagePtr & data) { return queue_ref.pop(data); } - - MPMCQueueResult tryPop(ReceivedMessagePtr & data) { return queue_ref.tryPop(data); } - - MPMCQueueResult forcePush(const ReceivedMessagePtr & data) { return queue_ref.forcePush(data); } - - bool finish() { return queue_ref.finish(); } - bool cancel() { return queue_ref.cancel(); } + MSGChannel() + : LooseBoundedMPMCQueue(std::numeric_limits::max()) + {} -private: - using QueueImpl = LooseBoundedMPMCQueue; - // these are unbounded queues. - std::unique_ptr queue = std::make_unique(std::numeric_limits::max()); - QueueImpl & queue_ref = *queue; + void registerTask(TaskPtr && task) override { registerPipeReadTask(std::move(task)); } }; class ReceivedMessageQueue @@ -147,7 +113,7 @@ class ReceivedMessageQueue grpc_recv_queue.finish(); /// msg_channels_for_fine_grained_shuffle must be finished after msg_channel is finished for (auto & channel : msg_channels_for_fine_grained_shuffle) - channel.finish(); + channel->finish(); } void cancel() @@ -155,7 +121,7 @@ class ReceivedMessageQueue grpc_recv_queue.cancel(); /// msg_channels_for_fine_grained_shuffle must be cancelled after msg_channel is cancelled for (auto & channel : msg_channels_for_fine_grained_shuffle) - channel.cancel(); + channel->cancel(); } bool isWritable() const { return grpc_recv_queue.isWritable(); } @@ -179,8 +145,8 @@ class ReceivedMessageQueue /// write: the writer first write the msg to msg_channel/grpc_recv_queue, if write success, then write msg to msg_channels_for_fine_grained_shuffle /// read: the reader read msg from msg_channels_for_fine_grained_shuffle, and reduce the `remaining_consumers` in msg, if `remaining_consumers` is 0, then /// remove the msg from msg_channel/grpc_recv_queue - std::vector msg_channels_for_fine_grained_shuffle; - GRPCNotifyQueue grpc_recv_queue; + std::vector> msg_channels_for_fine_grained_shuffle; + GRPCNotifyRecvQueue grpc_recv_queue; }; } // namespace DB diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 466868041be..873772014f8 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -2157,7 +2157,7 @@ bool Join::isProbeFinishedForPipeline() const { if (!probe_finished) { - setNotifyFuture(wait_probe_finished_future); + setNotifyFuture(wait_probe_finished_future.get()); return false; } return true; @@ -2167,7 +2167,7 @@ bool Join::isBuildFinishedForPipeline() const { if (!build_finished) { - setNotifyFuture(wait_build_finished_future); + setNotifyFuture(wait_build_finished_future.get()); return false; } return true; diff --git a/dbms/src/Operators/SharedAggregateRestorer.cpp b/dbms/src/Operators/SharedAggregateRestorer.cpp index c726823d49a..741f1f72c6b 100644 --- a/dbms/src/Operators/SharedAggregateRestorer.cpp +++ b/dbms/src/Operators/SharedAggregateRestorer.cpp @@ -203,7 +203,7 @@ bool SharedAggregateRestorer::tryPop(Block & block) case SharedLoadResult::FINISHED: return true; case SharedLoadResult::WAIT: - setNotifyFuture(loader); + setNotifyFuture(loader.get()); return false; } } From bb18d7880e5afa84b5d418c0d0c768575d6f52a8 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Fri, 27 Sep 2024 18:41:13 +0800 Subject: [PATCH 23/25] update Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/ReceivedMessage.cpp | 2 +- dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp | 2 +- dbms/src/Flash/Mpp/ReceivedMessageQueue.h | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dbms/src/Flash/Mpp/ReceivedMessage.cpp b/dbms/src/Flash/Mpp/ReceivedMessage.cpp index b95ca0e472d..eb1116e7481 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessage.cpp +++ b/dbms/src/Flash/Mpp/ReceivedMessage.cpp @@ -38,8 +38,8 @@ ReceivedMessage::ReceivedMessage( , error_ptr(error_ptr_) , resp_ptr(resp_ptr_) , chunks(chunks_) - , fine_grained_consumer_size(fine_grained_consumer_size_) , remaining_consumers(fine_grained_consumer_size_) + , fine_grained_consumer_size(fine_grained_consumer_size_) { if (fine_grained_consumer_size > 0) { diff --git a/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp b/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp index bbff335d09c..fd15dc67e8a 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp +++ b/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp @@ -114,7 +114,7 @@ ReceivedMessageQueue::ReceivedMessageQueue( assert(fine_grained_channel_size > 0); msg_channels_for_fine_grained_shuffle.reserve(fine_grained_channel_size); for (size_t i = 0; i < fine_grained_channel_size; ++i) - msg_channels_for_fine_grained_shuffle.emplace_back(); + msg_channels_for_fine_grained_shuffle.emplace_back(std::make_unique()); } } diff --git a/dbms/src/Flash/Mpp/ReceivedMessageQueue.h b/dbms/src/Flash/Mpp/ReceivedMessageQueue.h index 3d235a294b2..f1ac80c7612 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessageQueue.h +++ b/dbms/src/Flash/Mpp/ReceivedMessageQueue.h @@ -145,7 +145,7 @@ class ReceivedMessageQueue /// write: the writer first write the msg to msg_channel/grpc_recv_queue, if write success, then write msg to msg_channels_for_fine_grained_shuffle /// read: the reader read msg from msg_channels_for_fine_grained_shuffle, and reduce the `remaining_consumers` in msg, if `remaining_consumers` is 0, then /// remove the msg from msg_channel/grpc_recv_queue - std::vector> msg_channels_for_fine_grained_shuffle; + std::vector> msg_channels_for_fine_grained_shuffle; GRPCNotifyRecvQueue grpc_recv_queue; }; From c85801b3e133bfb94c986e8bf246c630e7cd1b9c Mon Sep 17 00:00:00 2001 From: gengliqi Date: Fri, 27 Sep 2024 18:44:56 +0800 Subject: [PATCH 24/25] update name Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp | 2 +- dbms/src/Flash/Mpp/ReceivedMessageQueue.h | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp b/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp index fd15dc67e8a..f89dc988826 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp +++ b/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp @@ -114,7 +114,7 @@ ReceivedMessageQueue::ReceivedMessageQueue( assert(fine_grained_channel_size > 0); msg_channels_for_fine_grained_shuffle.reserve(fine_grained_channel_size); for (size_t i = 0; i < fine_grained_channel_size; ++i) - msg_channels_for_fine_grained_shuffle.emplace_back(std::make_unique()); + msg_channels_for_fine_grained_shuffle.emplace_back(std::make_unique()); } } diff --git a/dbms/src/Flash/Mpp/ReceivedMessageQueue.h b/dbms/src/Flash/Mpp/ReceivedMessageQueue.h index f1ac80c7612..d0abb200455 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessageQueue.h +++ b/dbms/src/Flash/Mpp/ReceivedMessageQueue.h @@ -70,12 +70,12 @@ class GRPCNotifyRecvQueue final void registerTask(TaskPtr && task) override { registerPipeReadTask(std::move(task)); } }; -class MSGChannel final +class MSGUnboundedQueue final : public NotifyFuture , public LooseBoundedMPMCQueue { public: - MSGChannel() + MSGUnboundedQueue() : LooseBoundedMPMCQueue(std::numeric_limits::max()) {} @@ -145,7 +145,7 @@ class ReceivedMessageQueue /// write: the writer first write the msg to msg_channel/grpc_recv_queue, if write success, then write msg to msg_channels_for_fine_grained_shuffle /// read: the reader read msg from msg_channels_for_fine_grained_shuffle, and reduce the `remaining_consumers` in msg, if `remaining_consumers` is 0, then /// remove the msg from msg_channel/grpc_recv_queue - std::vector> msg_channels_for_fine_grained_shuffle; + std::vector> msg_channels_for_fine_grained_shuffle; GRPCNotifyRecvQueue grpc_recv_queue; }; From af512d35bf6d26eb91db33b10fd0102cfb5aa1de Mon Sep 17 00:00:00 2001 From: gengliqi Date: Fri, 27 Sep 2024 21:45:39 +0800 Subject: [PATCH 25/25] fix Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/ReceivedMessage.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Flash/Mpp/ReceivedMessage.h b/dbms/src/Flash/Mpp/ReceivedMessage.h index 25d4d80328a..664a1dd4a64 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessage.h +++ b/dbms/src/Flash/Mpp/ReceivedMessage.h @@ -34,7 +34,7 @@ class ReceivedMessage /// used for fine grained shuffle, remaining_consumers will be nullptr for non fine grained shuffle std::vector> fine_grained_chunks; std::atomic remaining_consumers; - bool fine_grained_consumer_size; + size_t fine_grained_consumer_size; public: // Constructor that move chunks.