From fa590d818c6a22787fbbe168298e79162cf41def Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 22 May 2024 18:02:19 +0800 Subject: [PATCH 01/11] add notify_future for mpptunnel --- dbms/src/Common/GRPCQueue.h | 6 ++++++ dbms/src/Flash/Mpp/LocalRequestHandler.h | 3 +++ dbms/src/Flash/Mpp/MPPTunnel.h | 24 +++++++++++++++++++++-- dbms/src/Flash/Mpp/ReceivedMessageQueue.h | 3 +++ 4 files changed, 34 insertions(+), 2 deletions(-) diff --git a/dbms/src/Common/GRPCQueue.h b/dbms/src/Common/GRPCQueue.h index 333c94a5381..497dab3a4ad 100644 --- a/dbms/src/Common/GRPCQueue.h +++ b/dbms/src/Common/GRPCQueue.h @@ -139,6 +139,9 @@ class GRPCSendQueue bool isWritable() const { return send_queue.isWritable(); } + void registerPipeReadTask(TaskPtr && task) { send_queue.registerPipeReadTask(std::move(task)); } + void registerPipeWriteTask(TaskPtr && task) { send_queue.registerPipeWriteTask(std::move(task)); } + private: friend class tests::TestGRPCSendQueue; @@ -297,6 +300,9 @@ class GRPCRecvQueue bool isWritable() const { return recv_queue.isWritable(); } + void registerPipeReadTask(TaskPtr && task) { recv_queue.registerPipeReadTask(std::move(task)); } + void registerPipeWriteTask(TaskPtr && task) { recv_queue.registerPipeWriteTask(std::move(task)); } + private: friend class tests::TestGRPCRecvQueue; diff --git a/dbms/src/Flash/Mpp/LocalRequestHandler.h b/dbms/src/Flash/Mpp/LocalRequestHandler.h index 20f52acc5b1..a6422d79880 100644 --- a/dbms/src/Flash/Mpp/LocalRequestHandler.h +++ b/dbms/src/Flash/Mpp/LocalRequestHandler.h @@ -42,6 +42,9 @@ struct LocalRequestHandler bool isWritable() const { return msg_queue->isWritable(); } + void registerPipeReadTask(TaskPtr && task) const { msg_queue->registerPipeReadTask(std::move(task)); } + void registerPipeWriteTask(TaskPtr && task) const { msg_queue->registerPipeWriteTask(std::move(task)); } + void writeDone(bool meet_error, const String & local_err_msg) const { notify_write_done(meet_error, local_err_msg); diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index 58062b7a170..231bcee32e4 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -92,10 +93,12 @@ enum class TunnelSenderMode /// TunnelSender is responsible for consuming data from Tunnel's internal send_queue and do the actual sending work /// After TunnelSend finished its work, either normally or abnormally, set ConsumerState to inform Tunnel -class TunnelSender : private boost::noncopyable +class TunnelSender + : private boost::noncopyable + , public NotifyFuture { public: - virtual ~TunnelSender() = default; + ~TunnelSender() override = default; TunnelSender( MemoryTrackerPtr & memory_tracker_, const LoggerPtr & log_, @@ -193,6 +196,8 @@ class SyncTunnelSender : public TunnelSender bool isWritable() const override { return send_queue.isWritable(); } + void registerTask(TaskPtr && task) override { send_queue.registerPipeWriteTask(std::move(task)); } + private: friend class tests::TestMPPTunnel; void sendJob(PacketWriter * writer); @@ -254,6 +259,8 @@ class AsyncTunnelSender : public TunnelSender void subDataSizeMetric(size_t size) { ::DB::MPPTunnelMetric::subDataSizeMetric(*data_size_in_queue, size); } + void registerTask(TaskPtr && task) override { queue.registerPipeWriteTask(std::move(task)); } + private: GRPCSendQueue queue; }; @@ -311,6 +318,17 @@ class LocalTunnelSenderV2 : public TunnelSender } } + void registerTask(TaskPtr && task) override + { + if constexpr (local_only) + local_request_handler.registerPipeWriteTask(std::move(task)); + else + { + std::lock_guard lock(mu); + local_request_handler.registerPipeWriteTask(std::move(task)); + } + } + private: friend class tests::TestMPPTunnel; @@ -405,6 +423,8 @@ class LocalTunnelSenderV1 : public TunnelSender bool isWritable() const override { return send_queue.isWritable(); } + void registerTask(TaskPtr && task) override { send_queue.registerPipeWriteTask(std::move(task)); } + private: bool cancel_reason_sent = false; LooseBoundedMPMCQueue send_queue; diff --git a/dbms/src/Flash/Mpp/ReceivedMessageQueue.h b/dbms/src/Flash/Mpp/ReceivedMessageQueue.h index 6fd41463d12..c975e4d2ab1 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessageQueue.h +++ b/dbms/src/Flash/Mpp/ReceivedMessageQueue.h @@ -99,6 +99,9 @@ class ReceivedMessageQueue bool isWritable() const { return grpc_recv_queue.isWritable(); } + void registerPipeReadTask(TaskPtr && task) { grpc_recv_queue.registerPipeReadTask(std::move(task)); } + void registerPipeWriteTask(TaskPtr && task) { grpc_recv_queue.registerPipeWriteTask(std::move(task)); } + #ifndef DBMS_PUBLIC_GTEST private: #endif From 62d3add9e182a604aa579004fe36bab39acef244 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 22 May 2024 18:31:09 +0800 Subject: [PATCH 02/11] add for writer --- .../src/Flash/Coprocessor/DAGResponseWriter.h | 3 ++ dbms/src/Flash/Coprocessor/StreamWriter.h | 7 +++- .../StreamingDAGResponseWriter.cpp | 6 +++ .../Coprocessor/StreamingDAGResponseWriter.h | 1 + dbms/src/Flash/Coprocessor/WaitResult.h | 25 ++++++++++++ .../tests/gtest_streaming_writer.cpp | 3 +- .../gtest_ti_remote_block_inputstream.cpp | 1 + .../Mpp/BroadcastOrPassThroughWriter.cpp | 6 +++ .../Flash/Mpp/BroadcastOrPassThroughWriter.h | 1 + .../Flash/Mpp/FineGrainedShuffleWriter.cpp | 6 +++ dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h | 1 + dbms/src/Flash/Mpp/HashPartitionWriter.cpp | 6 +++ dbms/src/Flash/Mpp/HashPartitionWriter.h | 1 + dbms/src/Flash/Mpp/MPPTunnel.cpp | 39 +++++++++++++++++++ dbms/src/Flash/Mpp/MPPTunnel.h | 3 ++ dbms/src/Flash/Mpp/MPPTunnelSet.cpp | 12 ++++++ dbms/src/Flash/Mpp/MPPTunnelSet.h | 2 + dbms/src/Flash/Mpp/MPPTunnelSetWriter.h | 7 ++++ .../Mpp/tests/gtest_mpp_exchange_writer.cpp | 3 +- 19 files changed, 129 insertions(+), 4 deletions(-) create mode 100644 dbms/src/Flash/Coprocessor/WaitResult.h diff --git a/dbms/src/Flash/Coprocessor/DAGResponseWriter.h b/dbms/src/Flash/Coprocessor/DAGResponseWriter.h index e4be0eb59b9..e8ac100a59e 100644 --- a/dbms/src/Flash/Coprocessor/DAGResponseWriter.h +++ b/dbms/src/Flash/Coprocessor/DAGResponseWriter.h @@ -17,6 +17,7 @@ #include #include #include +#include namespace DB { @@ -37,6 +38,8 @@ class DAGResponseWriter // ``` virtual bool isWritable() const { throw Exception("Unsupport"); } + virtual WaitResult waitForWritable() const { throw Exception("Unsupport"); } + /// flush cached blocks for batch writer virtual void flush() = 0; virtual ~DAGResponseWriter() = default; diff --git a/dbms/src/Flash/Coprocessor/StreamWriter.h b/dbms/src/Flash/Coprocessor/StreamWriter.h index 4eb62b27144..40061e2ed0f 100644 --- a/dbms/src/Flash/Coprocessor/StreamWriter.h +++ b/dbms/src/Flash/Coprocessor/StreamWriter.h @@ -16,6 +16,7 @@ #include #include +#include #ifdef __clang__ #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wdeprecated-declarations" @@ -57,7 +58,8 @@ struct CopStreamWriter if (!writer->Write(resp)) throw Exception("Failed to write resp"); } - bool isWritable() const { throw Exception("Unsupport async write"); } + static bool isWritable() { throw Exception("Unsupport async write"); } + static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } }; struct BatchCopStreamWriter @@ -81,7 +83,8 @@ struct BatchCopStreamWriter if (!writer->Write(resp)) throw Exception("Failed to write resp"); } - bool isWritable() const { throw Exception("Unsupport async write"); } + static bool isWritable() { throw Exception("Unsupport async write"); } + static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } }; using CopStreamWriterPtr = std::shared_ptr; diff --git a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp index b1169ca46bc..a2dc4e42f17 100644 --- a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp +++ b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp @@ -73,6 +73,12 @@ bool StreamingDAGResponseWriter::isWritable() const return writer->isWritable(); } +template +WaitResult StreamingDAGResponseWriter::waitForWritable() const +{ + return writer->waitForWritable(); +} + template void StreamingDAGResponseWriter::write(const Block & block) { diff --git a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h index e6f1e61b59e..56b355f5af7 100644 --- a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h +++ b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h @@ -38,6 +38,7 @@ class StreamingDAGResponseWriter : public DAGResponseWriter DAGContext & dag_context_); void write(const Block & block) override; bool isWritable() const override; + WaitResult waitForWritable() const override; void flush() override; private: diff --git a/dbms/src/Flash/Coprocessor/WaitResult.h b/dbms/src/Flash/Coprocessor/WaitResult.h new file mode 100644 index 00000000000..ed45e89111e --- /dev/null +++ b/dbms/src/Flash/Coprocessor/WaitResult.h @@ -0,0 +1,25 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +namespace DB +{ +enum class WaitResult +{ + Ready, + WaitForPolling, + WaitForNotify +}; +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp index c337f72de60..cf9b5ddee34 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp @@ -93,7 +93,8 @@ struct MockStreamWriter {} void write(tipb::SelectResponse & response) { checker(response); } - bool isWritable() const { throw Exception("Unsupport async write"); } + static bool isWritable() { throw Exception("Unsupport async write"); } + static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } private: MockStreamWriterChecker checker; diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp index 9448bd8477b..0211faa2c5b 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp @@ -148,6 +148,7 @@ struct MockWriter } static uint16_t getPartitionNum() { return 1; } static bool isWritable() { throw Exception("Unsupport async write"); } + static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } std::vector result_field_types; diff --git a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp index c2b42854214..06c7d20e0c5 100644 --- a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp +++ b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp @@ -75,6 +75,12 @@ bool BroadcastOrPassThroughWriter::isWritable() const return writer->isWritable(); } +template +WaitResult BroadcastOrPassThroughWriter::waitForWritable() const +{ + return writer->waitForWritable(); +} + template void BroadcastOrPassThroughWriter::write(const Block & block) { diff --git a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h index 9a28d13a461..e0a9d733e24 100644 --- a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h +++ b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h @@ -38,6 +38,7 @@ class BroadcastOrPassThroughWriter : public DAGResponseWriter tipb::ExchangeType exchange_type_); void write(const Block & block) override; bool isWritable() const override; + WaitResult waitForWritable() const override; void flush() override; private: diff --git a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp index cff3fe27e56..0893b7b77f1 100644 --- a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp +++ b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp @@ -101,6 +101,12 @@ bool FineGrainedShuffleWriter::isWritable() const return writer->isWritable(); } +template +WaitResult FineGrainedShuffleWriter::waitForWritable() const +{ + return writer->waitForWritable(); +} + template void FineGrainedShuffleWriter::write(const Block & block) { diff --git a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h index a2bf4cf40d6..6cf4fc3e326 100644 --- a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h +++ b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h @@ -46,6 +46,7 @@ class FineGrainedShuffleWriter : public DAGResponseWriter void prepare(const Block & sample_block) override; void write(const Block & block) override; bool isWritable() const override; + WaitResult waitForWritable() const override; void flush() override; private: diff --git a/dbms/src/Flash/Mpp/HashPartitionWriter.cpp b/dbms/src/Flash/Mpp/HashPartitionWriter.cpp index a46c5d3718a..eb976aed4c9 100644 --- a/dbms/src/Flash/Mpp/HashPartitionWriter.cpp +++ b/dbms/src/Flash/Mpp/HashPartitionWriter.cpp @@ -98,6 +98,12 @@ bool HashPartitionWriter::isWritable() const return writer->isWritable(); } +template +WaitResult HashPartitionWriter::waitForWritable() const +{ + return writer->waitForWritable(); +} + template void HashPartitionWriter::writeImplV1(const Block & block) { diff --git a/dbms/src/Flash/Mpp/HashPartitionWriter.h b/dbms/src/Flash/Mpp/HashPartitionWriter.h index b0d31b7b2e7..d11dd4deb9a 100644 --- a/dbms/src/Flash/Mpp/HashPartitionWriter.h +++ b/dbms/src/Flash/Mpp/HashPartitionWriter.h @@ -39,6 +39,7 @@ class HashPartitionWriter : public DAGResponseWriter tipb::CompressionMode compression_mode_); void write(const Block & block) override; bool isWritable() const override; + WaitResult waitForWritable() const override; void flush() override; private: diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index c0fe94efb52..3226e353f85 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -22,6 +22,7 @@ #include #include +#include "Flash/Pipeline/Schedule/Tasks/NotifyFuture.h" namespace DB { @@ -413,6 +414,44 @@ bool MPPTunnel::isWritable() const } } +WaitResult MPPTunnel::waitForWritable() const +{ + std::unique_lock lk(mu); + switch (status) + { + case TunnelStatus::Unconnected: + { + if (timeout.count() > 0) + { + fiu_do_on(FailPoints::random_tunnel_wait_timeout_failpoint, + throw Exception(fmt::format("{} is timeout", tunnel_id));); + if (unlikely(!timeout_stopwatch)) + timeout_stopwatch.emplace(CLOCK_MONOTONIC_COARSE); + if (unlikely(timeout_stopwatch->elapsed() > timeout_nanoseconds)) + throw Exception(fmt::format("{} is timeout", tunnel_id)); + } + return WaitResult::WaitForPolling; + } + case TunnelStatus::Connected: + case TunnelStatus::WaitingForSenderFinish: + RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id); + if (!tunnel_sender->isWritable()) + { + setNotifyFuture(tunnel_sender); + return WaitResult::WaitForNotify; + } + return WaitResult::Ready; + case TunnelStatus::Finished: + RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id); + throw Exception(fmt::format( + "write to tunnel {} which is already closed, {}", + tunnel_id, + tunnel_sender->isConsumerFinished() ? tunnel_sender->getConsumerFinishMsg() : "")); + default: + RUNTIME_ASSERT(false, log, "Unsupported tunnel status: {}", magic_enum::enum_name(status)); + } +} + std::string_view MPPTunnel::statusToString() { return magic_enum::enum_name(status); diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index 231bcee32e4..47d513c7812 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -500,6 +501,8 @@ class MPPTunnel : private boost::noncopyable void forceWrite(TrackedMppDataPacketPtr && data); bool isWritable() const; + WaitResult waitForWritable() const; + // finish the writing, and wait until the sender finishes. void writeDone(); diff --git a/dbms/src/Flash/Mpp/MPPTunnelSet.cpp b/dbms/src/Flash/Mpp/MPPTunnelSet.cpp index 6078ec8f2a4..a499444075d 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSet.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnelSet.cpp @@ -18,6 +18,7 @@ #include #include #include +#include "Flash/Mpp/MPPTunnel.h" namespace DB { @@ -78,6 +79,17 @@ bool MPPTunnelSetBase::isWritable() const return true; } +template +WaitResult MPPTunnelSetBase::waitForWritable() const +{ + for (const auto & tunnel : tunnels) + { + if (auto res = tunnel->waitForWritable(); res != WaitResult::Ready) + return res; + } + return WaitResult::Ready; +} + template void MPPTunnelSetBase::registerTunnel(const MPPTaskId & receiver_task_id, const TunnelPtr & tunnel) { diff --git a/dbms/src/Flash/Mpp/MPPTunnelSet.h b/dbms/src/Flash/Mpp/MPPTunnelSet.h index ee2c65768f7..606a1a0281b 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSet.h +++ b/dbms/src/Flash/Mpp/MPPTunnelSet.h @@ -62,6 +62,8 @@ class MPPTunnelSetBase : private boost::noncopyable bool isWritable() const; + WaitResult waitForWritable() const; + bool isLocal(size_t index) const; private: diff --git a/dbms/src/Flash/Mpp/MPPTunnelSetWriter.h b/dbms/src/Flash/Mpp/MPPTunnelSetWriter.h index ce923a88410..6bb06a0c190 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSetWriter.h +++ b/dbms/src/Flash/Mpp/MPPTunnelSetWriter.h @@ -71,6 +71,8 @@ class MPPTunnelSetWriterBase : private boost::noncopyable virtual bool isWritable() const = 0; + virtual WaitResult waitForWritable() const = 0; + protected: virtual void writeToTunnel(TrackedMppDataPacketPtr && data, size_t index) = 0; virtual void writeToTunnel(tipb::SelectResponse & response, size_t index) = 0; @@ -94,6 +96,9 @@ class SyncMPPTunnelSetWriter : public MPPTunnelSetWriterBase // For sync writer, `isWritable` will not be called, so an exception is thrown here. bool isWritable() const override { throw Exception("Unsupport sync writer"); } + // For sync writer, `waitForWritable` will not be called, so an exception is thrown here. + WaitResult waitForWritable() const override { throw Exception("Unsupport sync writer"); } + protected: void writeToTunnel(TrackedMppDataPacketPtr && data, size_t index) override; void writeToTunnel(tipb::SelectResponse & response, size_t index) override; @@ -112,6 +117,8 @@ class AsyncMPPTunnelSetWriter : public MPPTunnelSetWriterBase bool isWritable() const override { return mpp_tunnel_set->isWritable(); } + WaitResult waitForWritable() const override { return mpp_tunnel_set->waitForWritable(); } + protected: void writeToTunnel(TrackedMppDataPacketPtr && data, size_t index) override; void writeToTunnel(tipb::SelectResponse & response, size_t index) override; diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp index fd3b3593321..31520093a78 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp @@ -224,7 +224,8 @@ struct MockExchangeWriter // make only part 0 use local tunnel return index == 0; } - bool isWritable() const { throw Exception("Unsupport async write"); } + static bool isWritable() { throw Exception("Unsupport async write"); } + static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } private: MockExchangeWriterChecker checker; From e7d8c9be22c02ab2127651f7530d4c67f94b355a Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 22 May 2024 18:35:45 +0800 Subject: [PATCH 03/11] for exchange sender --- dbms/src/Operators/ExchangeSenderSinkOp.cpp | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/dbms/src/Operators/ExchangeSenderSinkOp.cpp b/dbms/src/Operators/ExchangeSenderSinkOp.cpp index 664e22e3cc5..4622e97d864 100644 --- a/dbms/src/Operators/ExchangeSenderSinkOp.cpp +++ b/dbms/src/Operators/ExchangeSenderSinkOp.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include "Flash/Coprocessor/WaitResult.h" namespace DB { @@ -41,12 +42,21 @@ OperatorStatus ExchangeSenderSinkOp::writeImpl(Block && block) OperatorStatus ExchangeSenderSinkOp::prepareImpl() { - return writer->isWritable() ? OperatorStatus::NEED_INPUT : OperatorStatus::WAITING; + return awaitImpl(); } OperatorStatus ExchangeSenderSinkOp::awaitImpl() { - return writer->isWritable() ? OperatorStatus::NEED_INPUT : OperatorStatus::WAITING; + auto res = writer->waitForWritable(); + switch (res) + { + case WaitResult::Ready: + return OperatorStatus::NEED_INPUT; + case WaitResult::WaitForPolling: + return OperatorStatus::WAITING; + case WaitResult::WaitForNotify: + return OperatorStatus::WAIT_FOR_NOTIFY; + } } } // namespace DB From 2ffee76722158090b5d3e698bdeda243903c2f36 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 22 May 2024 18:52:03 +0800 Subject: [PATCH 04/11] remove writable --- .../src/Flash/Coprocessor/DAGResponseWriter.h | 9 +++-- dbms/src/Flash/Coprocessor/StreamWriter.h | 2 -- .../StreamingDAGResponseWriter.cpp | 6 ---- .../Coprocessor/StreamingDAGResponseWriter.h | 1 - .../tests/gtest_streaming_writer.cpp | 1 - .../gtest_ti_remote_block_inputstream.cpp | 1 - .../Mpp/BroadcastOrPassThroughWriter.cpp | 6 ---- .../Flash/Mpp/BroadcastOrPassThroughWriter.h | 1 - .../Flash/Mpp/FineGrainedShuffleWriter.cpp | 6 ---- dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h | 1 - dbms/src/Flash/Mpp/HashPartitionWriter.cpp | 6 ---- dbms/src/Flash/Mpp/HashPartitionWriter.h | 1 - dbms/src/Flash/Mpp/MPPTunnel.cpp | 34 ------------------- dbms/src/Flash/Mpp/MPPTunnel.h | 9 +++-- dbms/src/Flash/Mpp/MPPTunnelSet.cpp | 12 ------- dbms/src/Flash/Mpp/MPPTunnelSet.h | 2 -- dbms/src/Flash/Mpp/MPPTunnelSetWriter.h | 7 ---- .../Mpp/tests/gtest_mpp_exchange_writer.cpp | 1 - dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp | 8 ++--- dbms/src/Operators/ExchangeSenderSinkOp.cpp | 18 ++++++---- dbms/src/Operators/ExchangeSenderSinkOp.h | 3 ++ 21 files changed, 26 insertions(+), 109 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGResponseWriter.h b/dbms/src/Flash/Coprocessor/DAGResponseWriter.h index e8ac100a59e..b9a756e365d 100644 --- a/dbms/src/Flash/Coprocessor/DAGResponseWriter.h +++ b/dbms/src/Flash/Coprocessor/DAGResponseWriter.h @@ -15,9 +15,9 @@ #pragma once #include +#include #include #include -#include namespace DB { @@ -31,13 +31,12 @@ class DAGResponseWriter virtual void prepare(const Block &){}; virtual void write(const Block & block) = 0; - // For async writer, `isWritable` need to be called before calling `write`. + // For async writer, `waitForWritable` need to be called before calling `write`. // ``` - // while (!isWritable()) {} + // auto res = waitForWritable(); + // switch (res) case... // write(block); // ``` - virtual bool isWritable() const { throw Exception("Unsupport"); } - virtual WaitResult waitForWritable() const { throw Exception("Unsupport"); } /// flush cached blocks for batch writer diff --git a/dbms/src/Flash/Coprocessor/StreamWriter.h b/dbms/src/Flash/Coprocessor/StreamWriter.h index 40061e2ed0f..41383ee49c2 100644 --- a/dbms/src/Flash/Coprocessor/StreamWriter.h +++ b/dbms/src/Flash/Coprocessor/StreamWriter.h @@ -58,7 +58,6 @@ struct CopStreamWriter if (!writer->Write(resp)) throw Exception("Failed to write resp"); } - static bool isWritable() { throw Exception("Unsupport async write"); } static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } }; @@ -83,7 +82,6 @@ struct BatchCopStreamWriter if (!writer->Write(resp)) throw Exception("Failed to write resp"); } - static bool isWritable() { throw Exception("Unsupport async write"); } static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } }; diff --git a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp index a2dc4e42f17..a6f39cb25dc 100644 --- a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp +++ b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp @@ -67,12 +67,6 @@ void StreamingDAGResponseWriter::flush() encodeThenWriteBlocks(); } -template -bool StreamingDAGResponseWriter::isWritable() const -{ - return writer->isWritable(); -} - template WaitResult StreamingDAGResponseWriter::waitForWritable() const { diff --git a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h index 56b355f5af7..61ca9a71517 100644 --- a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h +++ b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h @@ -37,7 +37,6 @@ class StreamingDAGResponseWriter : public DAGResponseWriter Int64 batch_send_min_limit_, DAGContext & dag_context_); void write(const Block & block) override; - bool isWritable() const override; WaitResult waitForWritable() const override; void flush() override; diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp index cf9b5ddee34..ed897c94fa0 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp @@ -93,7 +93,6 @@ struct MockStreamWriter {} void write(tipb::SelectResponse & response) { checker(response); } - static bool isWritable() { throw Exception("Unsupport async write"); } static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } private: diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp index 0211faa2c5b..7ff5d30129b 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp @@ -147,7 +147,6 @@ struct MockWriter queue->push(tracked_packet); } static uint16_t getPartitionNum() { return 1; } - static bool isWritable() { throw Exception("Unsupport async write"); } static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } std::vector result_field_types; diff --git a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp index 06c7d20e0c5..b4af6b40530 100644 --- a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp +++ b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp @@ -69,12 +69,6 @@ void BroadcastOrPassThroughWriter::flush() writeBlocks(); } -template -bool BroadcastOrPassThroughWriter::isWritable() const -{ - return writer->isWritable(); -} - template WaitResult BroadcastOrPassThroughWriter::waitForWritable() const { diff --git a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h index e0a9d733e24..be615c4c21c 100644 --- a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h +++ b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h @@ -37,7 +37,6 @@ class BroadcastOrPassThroughWriter : public DAGResponseWriter tipb::CompressionMode compression_mode_, tipb::ExchangeType exchange_type_); void write(const Block & block) override; - bool isWritable() const override; WaitResult waitForWritable() const override; void flush() override; diff --git a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp index 0893b7b77f1..200f60d4655 100644 --- a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp +++ b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp @@ -95,12 +95,6 @@ void FineGrainedShuffleWriter::flush() batchWriteFineGrainedShuffle(); } -template -bool FineGrainedShuffleWriter::isWritable() const -{ - return writer->isWritable(); -} - template WaitResult FineGrainedShuffleWriter::waitForWritable() const { diff --git a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h index 6cf4fc3e326..b050245cdca 100644 --- a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h +++ b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h @@ -45,7 +45,6 @@ class FineGrainedShuffleWriter : public DAGResponseWriter tipb::CompressionMode compression_mode_); void prepare(const Block & sample_block) override; void write(const Block & block) override; - bool isWritable() const override; WaitResult waitForWritable() const override; void flush() override; diff --git a/dbms/src/Flash/Mpp/HashPartitionWriter.cpp b/dbms/src/Flash/Mpp/HashPartitionWriter.cpp index eb976aed4c9..e567ab5ef89 100644 --- a/dbms/src/Flash/Mpp/HashPartitionWriter.cpp +++ b/dbms/src/Flash/Mpp/HashPartitionWriter.cpp @@ -92,12 +92,6 @@ void HashPartitionWriter::flush() } } -template -bool HashPartitionWriter::isWritable() const -{ - return writer->isWritable(); -} - template WaitResult HashPartitionWriter::waitForWritable() const { diff --git a/dbms/src/Flash/Mpp/HashPartitionWriter.h b/dbms/src/Flash/Mpp/HashPartitionWriter.h index d11dd4deb9a..8e36d28234d 100644 --- a/dbms/src/Flash/Mpp/HashPartitionWriter.h +++ b/dbms/src/Flash/Mpp/HashPartitionWriter.h @@ -38,7 +38,6 @@ class HashPartitionWriter : public DAGResponseWriter MPPDataPacketVersion data_codec_version_, tipb::CompressionMode compression_mode_); void write(const Block & block) override; - bool isWritable() const override; WaitResult waitForWritable() const override; void flush() override; diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 3226e353f85..9a16322d241 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -22,7 +22,6 @@ #include #include -#include "Flash/Pipeline/Schedule/Tasks/NotifyFuture.h" namespace DB { @@ -381,39 +380,6 @@ void MPPTunnel::waitUntilConnectedOrFinished(std::unique_lock & lk) throw Exception(fmt::format("MPPTunnel {} can not be connected because MPPTask is cancelled", tunnel_id)); } -bool MPPTunnel::isWritable() const -{ - std::unique_lock lk(mu); - switch (status) - { - case TunnelStatus::Unconnected: - { - if (timeout.count() > 0) - { - fiu_do_on(FailPoints::random_tunnel_wait_timeout_failpoint, - throw Exception(fmt::format("{} is timeout", tunnel_id));); - if (unlikely(!timeout_stopwatch)) - timeout_stopwatch.emplace(CLOCK_MONOTONIC_COARSE); - if (unlikely(timeout_stopwatch->elapsed() > timeout_nanoseconds)) - throw Exception(fmt::format("{} is timeout", tunnel_id)); - } - return false; - } - case TunnelStatus::Connected: - case TunnelStatus::WaitingForSenderFinish: - RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id); - return tunnel_sender->isWritable(); - case TunnelStatus::Finished: - RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id); - throw Exception(fmt::format( - "write to tunnel {} which is already closed, {}", - tunnel_id, - tunnel_sender->isConsumerFinished() ? tunnel_sender->getConsumerFinishMsg() : "")); - default: - RUNTIME_ASSERT(false, log, "Unsupported tunnel status: {}", magic_enum::enum_name(status)); - } -} - WaitResult MPPTunnel::waitForWritable() const { std::unique_lock lk(mu); diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index 47d513c7812..6eb57fd14b7 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -493,15 +493,14 @@ class MPPTunnel : private boost::noncopyable void write(TrackedMppDataPacketPtr && data); // forceWrite write a single packet to the tunnel's send queue without blocking, - // and need to call isReadForWrite first. + // and need to call waitForWritable first. // ``` - // while (!isWritable()) {} + // auto res = waitForWritable(); + // switch (res) case... // forceWrite(std::move(data)); // ``` - void forceWrite(TrackedMppDataPacketPtr && data); - bool isWritable() const; - WaitResult waitForWritable() const; + void forceWrite(TrackedMppDataPacketPtr && data); // finish the writing, and wait until the sender finishes. void writeDone(); diff --git a/dbms/src/Flash/Mpp/MPPTunnelSet.cpp b/dbms/src/Flash/Mpp/MPPTunnelSet.cpp index a499444075d..47f7fe2299c 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSet.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnelSet.cpp @@ -18,7 +18,6 @@ #include #include #include -#include "Flash/Mpp/MPPTunnel.h" namespace DB { @@ -68,17 +67,6 @@ void MPPTunnelSetBase::forceWrite(tipb::SelectResponse & response, size_ tunnels[index]->forceWrite(serializePacket(response)); } -template -bool MPPTunnelSetBase::isWritable() const -{ - for (const auto & tunnel : tunnels) - { - if (!tunnel->isWritable()) - return false; - } - return true; -} - template WaitResult MPPTunnelSetBase::waitForWritable() const { diff --git a/dbms/src/Flash/Mpp/MPPTunnelSet.h b/dbms/src/Flash/Mpp/MPPTunnelSet.h index 606a1a0281b..a57f57b3ac8 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSet.h +++ b/dbms/src/Flash/Mpp/MPPTunnelSet.h @@ -60,8 +60,6 @@ class MPPTunnelSetBase : private boost::noncopyable const std::vector & getTunnels() const { return tunnels; } - bool isWritable() const; - WaitResult waitForWritable() const; bool isLocal(size_t index) const; diff --git a/dbms/src/Flash/Mpp/MPPTunnelSetWriter.h b/dbms/src/Flash/Mpp/MPPTunnelSetWriter.h index 6bb06a0c190..1af6730f108 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSetWriter.h +++ b/dbms/src/Flash/Mpp/MPPTunnelSetWriter.h @@ -69,8 +69,6 @@ class MPPTunnelSetWriterBase : private boost::noncopyable uint16_t getPartitionNum() const { return mpp_tunnel_set->getPartitionNum(); } - virtual bool isWritable() const = 0; - virtual WaitResult waitForWritable() const = 0; protected: @@ -93,9 +91,6 @@ class SyncMPPTunnelSetWriter : public MPPTunnelSetWriterBase : MPPTunnelSetWriterBase(mpp_tunnel_set_, result_field_types_, req_id) {} - // For sync writer, `isWritable` will not be called, so an exception is thrown here. - bool isWritable() const override { throw Exception("Unsupport sync writer"); } - // For sync writer, `waitForWritable` will not be called, so an exception is thrown here. WaitResult waitForWritable() const override { throw Exception("Unsupport sync writer"); } @@ -115,8 +110,6 @@ class AsyncMPPTunnelSetWriter : public MPPTunnelSetWriterBase : MPPTunnelSetWriterBase(mpp_tunnel_set_, result_field_types_, req_id) {} - bool isWritable() const override { return mpp_tunnel_set->isWritable(); } - WaitResult waitForWritable() const override { return mpp_tunnel_set->waitForWritable(); } protected: diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp index 31520093a78..5953549827b 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp @@ -224,7 +224,6 @@ struct MockExchangeWriter // make only part 0 use local tunnel return index == 0; } - static bool isWritable() { throw Exception("Unsupport async write"); } static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } private: diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index 3d71f00cb0e..bf4a494651e 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -794,7 +794,7 @@ TEST_F(TestMPPTunnel, SyncTunnelForceWrite) mpp_tunnel_ptr->connectSync(writer_ptr.get()); GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); - ASSERT_TRUE(mpp_tunnel_ptr->isWritable()); + ASSERT_TRUE(mpp_tunnel_ptr->waitForWritable() == WaitResult::Ready); mpp_tunnel_ptr->forceWrite(newDataPacket("First")); mpp_tunnel_ptr->writeDone(); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); @@ -811,7 +811,7 @@ TEST_F(TestMPPTunnel, AsyncTunnelForceWrite) GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); std::thread t(&MockAsyncCallData::run, call_data.get()); - ASSERT_TRUE(mpp_tunnel_ptr->isWritable()); + ASSERT_TRUE(mpp_tunnel_ptr->waitForWritable() == WaitResult::Ready); mpp_tunnel_ptr->forceWrite(newDataPacket("First")); mpp_tunnel_ptr->writeDone(); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); @@ -828,7 +828,7 @@ TEST_F(TestMPPTunnel, LocalTunnelForceWrite) GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); std::thread t(&MockExchangeReceiver::receiveAll, receiver.get()); - ASSERT_TRUE(mpp_tunnel_ptr->isWritable()); + ASSERT_TRUE(mpp_tunnel_ptr->waitForWritable() == WaitResult::Ready); mpp_tunnel_ptr->forceWrite(newDataPacket("First")); mpp_tunnel_ptr->writeDone(); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); @@ -846,7 +846,7 @@ try Stopwatch stop_watch{CLOCK_MONOTONIC_COARSE}; while (stop_watch.elapsedSeconds() < 3 * timeout.count()) { - ASSERT_FALSE(mpp_tunnel_ptr->isWritable()); + ASSERT_FALSE(mpp_tunnel_ptr->waitForWritable() == WaitResult::Ready); } GTEST_FAIL(); } diff --git a/dbms/src/Operators/ExchangeSenderSinkOp.cpp b/dbms/src/Operators/ExchangeSenderSinkOp.cpp index 4622e97d864..1f073c67d20 100644 --- a/dbms/src/Operators/ExchangeSenderSinkOp.cpp +++ b/dbms/src/Operators/ExchangeSenderSinkOp.cpp @@ -13,7 +13,6 @@ // limitations under the License. #include -#include "Flash/Coprocessor/WaitResult.h" namespace DB { @@ -40,12 +39,7 @@ OperatorStatus ExchangeSenderSinkOp::writeImpl(Block && block) return OperatorStatus::NEED_INPUT; } -OperatorStatus ExchangeSenderSinkOp::prepareImpl() -{ - return awaitImpl(); -} - -OperatorStatus ExchangeSenderSinkOp::awaitImpl() +OperatorStatus ExchangeSenderSinkOp::waitForWriter() const { auto res = writer->waitForWritable(); switch (res) @@ -59,4 +53,14 @@ OperatorStatus ExchangeSenderSinkOp::awaitImpl() } } +OperatorStatus ExchangeSenderSinkOp::prepareImpl() +{ + return waitForWriter(); +} + +OperatorStatus ExchangeSenderSinkOp::awaitImpl() +{ + return waitForWriter(); +} + } // namespace DB diff --git a/dbms/src/Operators/ExchangeSenderSinkOp.h b/dbms/src/Operators/ExchangeSenderSinkOp.h index b4e4702d518..6a04c10faf2 100644 --- a/dbms/src/Operators/ExchangeSenderSinkOp.h +++ b/dbms/src/Operators/ExchangeSenderSinkOp.h @@ -43,6 +43,9 @@ class ExchangeSenderSinkOp : public SinkOp OperatorStatus awaitImpl() override; +private: + OperatorStatus waitForWriter() const; + private: std::unique_ptr writer; size_t total_rows = 0; From 563573bc94f04b4e30714f177a79670924c7dab7 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 22 May 2024 19:16:28 +0800 Subject: [PATCH 05/11] try fmt --- dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index bf4a494651e..bbea4210d36 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -31,7 +31,6 @@ #include #include - namespace DB { namespace tests From e1e920cb81b1d17af1443e5480d1c9e2bf5a1f55 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 22 May 2024 22:45:34 +0800 Subject: [PATCH 06/11] refine ut --- dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index bbea4210d36..e2fb701431f 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -793,7 +793,7 @@ TEST_F(TestMPPTunnel, SyncTunnelForceWrite) mpp_tunnel_ptr->connectSync(writer_ptr.get()); GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); - ASSERT_TRUE(mpp_tunnel_ptr->waitForWritable() == WaitResult::Ready); + GTEST_ASSERT_EQ(mpp_tunnel_ptr->waitForWritable(), WaitResult::Ready); mpp_tunnel_ptr->forceWrite(newDataPacket("First")); mpp_tunnel_ptr->writeDone(); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); @@ -810,7 +810,7 @@ TEST_F(TestMPPTunnel, AsyncTunnelForceWrite) GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); std::thread t(&MockAsyncCallData::run, call_data.get()); - ASSERT_TRUE(mpp_tunnel_ptr->waitForWritable() == WaitResult::Ready); + GTEST_ASSERT_EQ(mpp_tunnel_ptr->waitForWritable(), WaitResult::Ready); mpp_tunnel_ptr->forceWrite(newDataPacket("First")); mpp_tunnel_ptr->writeDone(); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); @@ -827,7 +827,7 @@ TEST_F(TestMPPTunnel, LocalTunnelForceWrite) GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); std::thread t(&MockExchangeReceiver::receiveAll, receiver.get()); - ASSERT_TRUE(mpp_tunnel_ptr->waitForWritable() == WaitResult::Ready); + GTEST_ASSERT_EQ(mpp_tunnel_ptr->waitForWritable(), WaitResult::Ready); mpp_tunnel_ptr->forceWrite(newDataPacket("First")); mpp_tunnel_ptr->writeDone(); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); @@ -845,7 +845,7 @@ try Stopwatch stop_watch{CLOCK_MONOTONIC_COARSE}; while (stop_watch.elapsedSeconds() < 3 * timeout.count()) { - ASSERT_FALSE(mpp_tunnel_ptr->waitForWritable() == WaitResult::Ready); + GTEST_ASSERT_EQ(mpp_tunnel_ptr->waitForWritable(), WaitResult::WaitForPolling); } GTEST_FAIL(); } From c5655c667736725e2754e16b8488abfcc750c06c Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Thu, 23 May 2024 14:33:13 +0800 Subject: [PATCH 07/11] format Signed-off-by: Lloyd-Pottiger --- dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index e2fb701431f..ff11aef5ec1 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -750,8 +750,7 @@ try tunnel->write(std::move(packet)); } catch (...) - { - } + {} }; std::thread thd(tunnelRun, std::move(run_tunnel)); thd.join(); From df2350696de3e3a8928884a3d774edef837812fb Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 5 Jun 2024 12:57:50 +0800 Subject: [PATCH 08/11] fix may hang issue --- dbms/src/Flash/Executor/PipelineExecutor.cpp | 1 + dbms/src/Flash/Executor/PipelineExecutorContext.cpp | 8 ++++++++ dbms/src/Flash/Executor/PipelineExecutorContext.h | 6 ++++++ 3 files changed, 15 insertions(+) diff --git a/dbms/src/Flash/Executor/PipelineExecutor.cpp b/dbms/src/Flash/Executor/PipelineExecutor.cpp index 602531fa49e..bb3f5833900 100644 --- a/dbms/src/Flash/Executor/PipelineExecutor.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutor.cpp @@ -34,6 +34,7 @@ PipelineExecutor::PipelineExecutor( // But for cop/batchCop, there is no such unique identifier, so an empty value is given here, indicating that the query id of PipelineExecutor is invalid. /*query_id=*/context.getDAGContext()->isMPPTask() ? context.getDAGContext()->getMPPTaskId().toString() : "", req_id, + context.getDAGContext(), memory_tracker_, auto_spill_trigger, register_operator_spill_context, diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp index 516fdc36e3c..72a5e7a2bcb 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include +#include #include #include @@ -153,6 +155,12 @@ void PipelineExecutorContext::cancel() bool origin_value = false; if (is_cancelled.compare_exchange_strong(origin_value, true, std::memory_order_release)) { + if (likely(dag_context)) + { + // Cancel the tunnel_set here to prevent pipeline tasks waiting in the WAIT_FOR_NOTIFY state from never being notified. + if (dag_context->tunnel_set) + dag_context->tunnel_set->close("", false); + } cancelSharedQueues(); if likely (TaskScheduler::instance && !query_id.empty()) TaskScheduler::instance->cancel(query_id, resource_group_name); diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.h b/dbms/src/Flash/Executor/PipelineExecutorContext.h index fca5291c4da..951d4465da8 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.h +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.h @@ -34,6 +34,8 @@ using RegisterOperatorSpillContext = std::function; +class DAGContext; + class PipelineExecutorContext : private boost::noncopyable { public: @@ -50,12 +52,14 @@ class PipelineExecutorContext : private boost::noncopyable PipelineExecutorContext( const String & query_id_, const String & req_id, + DAGContext * dag_context_, const MemoryTrackerPtr & mem_tracker_, AutoSpillTrigger * auto_spill_trigger_ = nullptr, const RegisterOperatorSpillContext & register_operator_spill_context_ = nullptr, const String & resource_group_name_ = "") : query_id(query_id_) , log(Logger::get(req_id)) + , dag_context(dag_context_) , mem_tracker(mem_tracker_) , auto_spill_trigger(auto_spill_trigger_) , register_operator_spill_context(register_operator_spill_context_) @@ -194,6 +198,8 @@ class PipelineExecutorContext : private boost::noncopyable LoggerPtr log; + DAGContext * dag_context{nullptr}; + MemoryTrackerPtr mem_tracker; std::mutex mu; From 0c43a11a70aa86e75c3e482f00d584615b1a44ae Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 5 Jun 2024 13:31:29 +0800 Subject: [PATCH 09/11] some fix --- dbms/src/Flash/Executor/PipelineExecutorContext.cpp | 2 +- dbms/src/Flash/Mpp/MPPTask.cpp | 6 +++--- dbms/src/Flash/Mpp/MPPTunnel.h | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp index 72a5e7a2bcb..a949010d505 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp @@ -159,7 +159,7 @@ void PipelineExecutorContext::cancel() { // Cancel the tunnel_set here to prevent pipeline tasks waiting in the WAIT_FOR_NOTIFY state from never being notified. if (dag_context->tunnel_set) - dag_context->tunnel_set->close("", false); + dag_context->tunnel_set->close(getExceptionMsg(), false); } cancelSharedQueues(); if likely (TaskScheduler::instance && !query_id.empty()) diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 75c5c221ed4..2bac7bcd115 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -759,13 +759,13 @@ void MPPTask::abort(const String & message, AbortType abort_type) } else if (previous_status == RUNNING && switchStatus(RUNNING, next_task_status)) { - /// abort the components from top to bottom because if bottom components are aborted - /// first, the top components may see an error caused by the abort, which is not + /// abort mpptunnels first because if others components are aborted + /// first, the mpptunnels may see an error caused by the abort, which is not /// the original error setErrString(message); abortTunnels(message, false); - abortQueryExecutor(); abortReceivers(); + abortQueryExecutor(); scheduleThisTask(ScheduleState::FAILED); /// runImpl is running, leave remaining work to runImpl LOG_WARNING(log, "Finish abort task from running"); diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index 6eb57fd14b7..4c2421437e4 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -336,7 +336,7 @@ class LocalTunnelSenderV2 : public TunnelSender template bool pushImpl(TrackedMppDataPacketPtr && data) { - if (unlikely(checkPacketErr(data))) + if (unlikely(is_done || checkPacketErr(data))) return false; // When ExchangeReceiver receives data from local and remote tiflash, number of local tunnel threads From 94d954b696a1edee74b7839a583d2d2d60283f25 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 5 Jun 2024 13:48:50 +0800 Subject: [PATCH 10/11] fix ut --- dbms/src/Flash/Executor/PipelineExecutor.cpp | 2 +- dbms/src/Flash/Executor/PipelineExecutorContext.h | 8 ++++---- .../TaskQueues/tests/gtest_resource_control_queue.cpp | 6 +++++- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/dbms/src/Flash/Executor/PipelineExecutor.cpp b/dbms/src/Flash/Executor/PipelineExecutor.cpp index bb3f5833900..b7c86b27ef2 100644 --- a/dbms/src/Flash/Executor/PipelineExecutor.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutor.cpp @@ -34,8 +34,8 @@ PipelineExecutor::PipelineExecutor( // But for cop/batchCop, there is no such unique identifier, so an empty value is given here, indicating that the query id of PipelineExecutor is invalid. /*query_id=*/context.getDAGContext()->isMPPTask() ? context.getDAGContext()->getMPPTaskId().toString() : "", req_id, - context.getDAGContext(), memory_tracker_, + context.getDAGContext(), auto_spill_trigger, register_operator_spill_context, context.getDAGContext()->getResourceGroupName()) diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.h b/dbms/src/Flash/Executor/PipelineExecutorContext.h index 951d4465da8..3181d6fe6aa 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.h +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.h @@ -52,15 +52,15 @@ class PipelineExecutorContext : private boost::noncopyable PipelineExecutorContext( const String & query_id_, const String & req_id, - DAGContext * dag_context_, const MemoryTrackerPtr & mem_tracker_, + DAGContext * dag_context_ = nullptr, AutoSpillTrigger * auto_spill_trigger_ = nullptr, const RegisterOperatorSpillContext & register_operator_spill_context_ = nullptr, const String & resource_group_name_ = "") : query_id(query_id_) , log(Logger::get(req_id)) - , dag_context(dag_context_) , mem_tracker(mem_tracker_) + , dag_context(dag_context_) , auto_spill_trigger(auto_spill_trigger_) , register_operator_spill_context(register_operator_spill_context_) , resource_group_name(resource_group_name_) @@ -198,10 +198,10 @@ class PipelineExecutorContext : private boost::noncopyable LoggerPtr log; - DAGContext * dag_context{nullptr}; - MemoryTrackerPtr mem_tracker; + DAGContext * dag_context{nullptr}; + std::mutex mu; std::condition_variable cv; std::exception_ptr exception_ptr; diff --git a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/tests/gtest_resource_control_queue.cpp b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/tests/gtest_resource_control_queue.cpp index 5003117c91c..76565dac6ea 100644 --- a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/tests/gtest_resource_control_queue.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/tests/gtest_resource_control_queue.cpp @@ -176,6 +176,7 @@ class TestResourceControlQueue : public ::testing::Test mem_tracker, nullptr, nullptr, + nullptr, resource_group_name); } return all_contexts; @@ -378,6 +379,7 @@ TEST_F(TestResourceControlQueue, BasicTest) mem_tracker, nullptr, nullptr, + nullptr, group_name); for (int j = 0; j < task_num_per_resource_group; ++j) @@ -413,6 +415,7 @@ TEST_F(TestResourceControlQueue, BasicTimeoutTest) mem_tracker, nullptr, nullptr, + nullptr, group_name); auto task = std::make_unique(*exec_context); @@ -441,7 +444,8 @@ TEST_F(TestResourceControlQueue, RunOutOfRU) TaskSchedulerConfig config{thread_num, thread_num}; TaskScheduler task_scheduler(config); - PipelineExecutorContext exec_context("mock-query-id", "mock-req-id", mem_tracker, nullptr, nullptr, rg_name); + PipelineExecutorContext + exec_context("mock-query-id", "mock-req-id", mem_tracker, nullptr, nullptr, nullptr, rg_name); auto task = std::make_unique(exec_context); // This task should use 5*100ms cpu_time. From dd1316fb66753eb636ab216f8d2fae17ec618c49 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 5 Jun 2024 14:00:11 +0800 Subject: [PATCH 11/11] refine err msg --- .../Executor/PipelineExecutorContext.cpp | 23 +++++++++++++++++-- .../Flash/Executor/PipelineExecutorContext.h | 2 ++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp index a949010d505..a3266cec721 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -53,6 +54,24 @@ String PipelineExecutorContext::getExceptionMsg() } } +String PipelineExecutorContext::getTrimmedErrMsg() +{ + try + { + auto cur_exception_ptr = getExceptionPtr(); + if (!cur_exception_ptr) + return ""; + std::rethrow_exception(cur_exception_ptr); + } + catch (...) + { + auto err_msg = getCurrentExceptionMessage(true, true); + if (likely(!err_msg.empty())) + trimStackTrace(err_msg); + return err_msg; + } +} + void PipelineExecutorContext::onErrorOccurred(const String & err_msg) { DB::Exception e(err_msg); @@ -155,13 +174,13 @@ void PipelineExecutorContext::cancel() bool origin_value = false; if (is_cancelled.compare_exchange_strong(origin_value, true, std::memory_order_release)) { + cancelSharedQueues(); if (likely(dag_context)) { // Cancel the tunnel_set here to prevent pipeline tasks waiting in the WAIT_FOR_NOTIFY state from never being notified. if (dag_context->tunnel_set) - dag_context->tunnel_set->close(getExceptionMsg(), false); + dag_context->tunnel_set->close(getTrimmedErrMsg(), false); } - cancelSharedQueues(); if likely (TaskScheduler::instance && !query_id.empty()) TaskScheduler::instance->cancel(query_id, resource_group_name); } diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.h b/dbms/src/Flash/Executor/PipelineExecutorContext.h index 3181d6fe6aa..b0f8d6b0946 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.h +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.h @@ -186,6 +186,8 @@ class PipelineExecutorContext : private boost::noncopyable private: bool setExceptionPtr(const std::exception_ptr & exception_ptr_); + String getTrimmedErrMsg(); + // Need to be called under lock. bool isWaitMode();