From 098658fb091372b0e07aafc8c5eff628b334048d Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Sun, 29 Jan 2023 11:24:41 +0800 Subject: [PATCH 01/14] init --- dbms/src/Debug/dbgQueryExecutor.cpp | 3 +- dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 78 +++++++++++------- dbms/src/Flash/Mpp/ExchangeReceiver.h | 4 + dbms/src/Flash/Mpp/GRPCReceiverContext.cpp | 92 ++++++++++++++++++++++ dbms/src/Flash/Mpp/GRPCReceiverContext.h | 4 + dbms/src/Flash/Mpp/MPPTask.cpp | 4 +- dbms/src/Flash/Mpp/MPPTunnel.cpp | 46 +++++++++++ dbms/src/Flash/Mpp/MPPTunnel.h | 38 +++++++++ dbms/src/Interpreters/Settings.h | 3 +- dbms/src/Storages/StorageDisaggregated.cpp | 1 + 10 files changed, 240 insertions(+), 33 deletions(-) diff --git a/dbms/src/Debug/dbgQueryExecutor.cpp b/dbms/src/Debug/dbgQueryExecutor.cpp index 359aa833f25..733e1c9edcc 100644 --- a/dbms/src/Debug/dbgQueryExecutor.cpp +++ b/dbms/src/Debug/dbgQueryExecutor.cpp @@ -68,7 +68,8 @@ BlockInputStreamPtr constructExchangeReceiverStream(Context & context, tipb::Exc 10, /*req_id=*/"", /*executor_id=*/"", - /*fine_grained_shuffle_stream_count=*/0); + /*fine_grained_shuffle_stream_count=*/0, + context.getSettings().enable_refined_local_tunnel); BlockInputStreamPtr ret = std::make_shared(exchange_receiver, /*req_id=*/"", /*executor_id=*/"", /*stream_id*/ 0); return ret; } diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index da9a00f59de..6b9064bc68c 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -308,6 +308,7 @@ ExchangeReceiverBase::ExchangeReceiverBase( const String & req_id, const String & executor_id, uint64_t fine_grained_shuffle_stream_count_, + bool enable_refined_local_, const std::vector & disaggregated_dispatch_reqs_) : rpc_context(std::move(rpc_context_)) , source_num(source_num_) @@ -320,6 +321,7 @@ ExchangeReceiverBase::ExchangeReceiverBase( , state(ExchangeReceiverState::NORMAL) , exc_log(Logger::get(req_id, executor_id)) , collected(false) + , enable_refined_local(enable_refined_local_) , data_size_in_queue(0) , disaggregated_dispatch_reqs(disaggregated_dispatch_reqs_) { @@ -455,37 +457,39 @@ void ExchangeReceiverBase::setUpConnection() async_requests.push_back(std::move(req)); else if (req.is_local) { - String req_info = fmt::format("tunnel{}+{}", req.send_task_id, req.recv_task_id); - - LocalRequestHandler local_request_handler( - getMemoryTracker(), - [this](bool meet_error, const String & local_err_msg) { - this->connectionDone(meet_error, local_err_msg, exc_log); - }, - [this]() { - this->connectionLocalDone(); - }, - [this]() { - this->addLocalConnectionNum(); - }, - ReceiverChannelWriter(&(getMsgChannels()), req_info, exc_log, getDataSizeInQueue(), ReceiverMode::Local)); - - rpc_context->establishMPPConnectionLocal( - req, - req.source_index, - local_request_handler, - enable_fine_grained_shuffle_flag); + if (enable_refined_local) + { + LOG_INFO(exc_log, "refined local tunnel is enabled"); + String req_info = fmt::format("tunnel{}+{}", req.send_task_id, req.recv_task_id); + + LocalRequestHandler local_request_handler( + getMemoryTracker(), + [this](bool meet_error, const String & local_err_msg) { + this->connectionDone(meet_error, local_err_msg, exc_log); + }, + [this]() { + this->connectionLocalDone(); + }, + [this]() { + this->addLocalConnectionNum(); + }, + ReceiverChannelWriter(&(getMsgChannels()), req_info, exc_log, getDataSizeInQueue(), ReceiverMode::Local)); + + rpc_context->establishMPPConnectionLocal( + req, + req.source_index, + local_request_handler, + enable_fine_grained_shuffle_flag); + } + else + { + LOG_INFO(exc_log, "refined local tunnel is disabled"); + setUpConnectionWithReadLoop(std::move(req)); + } } else { - thread_manager->schedule(true, "Receiver", [this, req = std::move(req)] { - if (enable_fine_grained_shuffle_flag) - readLoop(req); - else - readLoop(req); - }); - - ++thread_count; + setUpConnectionWithReadLoop(std::move(req)); } } @@ -503,6 +507,19 @@ void ExchangeReceiverBase::setUpConnection() } } +template +void ExchangeReceiverBase::setUpConnectionWithReadLoop(ExchangeRecvRequest && req) +{ + thread_manager->schedule(true, "Receiver", [this, req = std::move(req)] { + if (enable_fine_grained_shuffle_flag) + readLoop(req); + else + readLoop(req); + }); + + ++thread_count; +} + template template void ExchangeReceiverBase::reactor(const std::vector & async_requests) @@ -567,10 +584,11 @@ void ExchangeReceiverBase::readLoop(const Request & req) try { auto status = RPCContext::getStatusOK(); - ReceiverChannelWriter channel_writer(&msg_channels, req_info, log, &data_size_in_queue, ReceiverMode::Sync); + ReceiverMode recv_mode = req.is_local ? ReceiverMode::Local : ReceiverMode::Sync; + ReceiverChannelWriter channel_writer(&msg_channels, req_info, log, &data_size_in_queue, recv_mode); for (int i = 0; i < max_retry_times; ++i) { - auto reader = rpc_context->makeSyncReader(req); + auto reader = rpc_context->makeReader(req); bool has_data = false; for (;;) { diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.h b/dbms/src/Flash/Mpp/ExchangeReceiver.h index 087b7c45c97..ebeae3f5d13 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.h +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.h @@ -97,6 +97,7 @@ class ExchangeReceiverBase const String & req_id, const String & executor_id, uint64_t fine_grained_shuffle_stream_count, + bool enable_refined_local_, const std::vector & disaggregated_dispatch_reqs_ = {}); ~ExchangeReceiverBase(); @@ -166,6 +167,8 @@ class ExchangeReceiverBase void connectionLocalDone(); + void setUpConnectionWithReadLoop(ExchangeRecvRequest && req); + bool isReceiverForTiFlashStorage() { // If not empty, need to send MPPTask to tiflash_storage. @@ -198,6 +201,7 @@ class ExchangeReceiverBase bool collected = false; int thread_count = 0; + bool enable_refined_local; std::atomic data_size_in_queue; diff --git a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp index eadbc19d099..f3ba0d0182c 100644 --- a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp +++ b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp @@ -288,6 +288,98 @@ void GRPCReceiverContext::establishMPPConnectionLocal( tunnel->connectLocal(source_index, local_request_handler, is_fine_grained); } +// TODO remove it in the future +std::tuple GRPCReceiverContext::establishMPPConnectionLocalUnrefined( + const ::mpp::EstablishMPPConnectionRequest * request, + const std::shared_ptr & task_manager) const +{ + std::chrono::seconds timeout(10); + auto [tunnel, err_msg] = task_manager->findTunnelWithTimeout(request, timeout); + if (tunnel == nullptr) + { + return std::make_tuple(tunnel, grpc::Status(grpc::StatusCode::INTERNAL, err_msg)); + } + if (!tunnel->isLocal()) + { + return std::make_tuple(nullptr, grpc::Status(grpc::StatusCode::INTERNAL, "EstablishMPPConnectionLocal into a remote channel!")); + } + tunnel->connectUnrefinedLocal(nullptr); + return std::make_tuple(tunnel, grpc::Status::OK); +} + +// TODO remove it in the future +struct LocalExchangePacketReader : public ExchangePacketReader +{ + LocalTunnelSenderUnrefinedPtr local_tunnel_sender; + + explicit LocalExchangePacketReader(const LocalTunnelSenderUnrefinedPtr & local_tunnel_sender_) + : local_tunnel_sender(local_tunnel_sender_) + {} + + /// put the implementation of dtor in .cpp so we don't need to put the specialization of + /// pingcap::kv::RpcCall in header file. + ~LocalExchangePacketReader() override + { + if (local_tunnel_sender) + { + // In case that ExchangeReceiver throw error before finish reading from mpp_tunnel + local_tunnel_sender->consumerFinish("Receiver exists"); + local_tunnel_sender.reset(); + } + } + + bool read(TrackedMppDataPacketPtr & packet) override + { + TrackedMppDataPacketPtr tmp_packet = local_tunnel_sender->readForLocal(); + bool success = tmp_packet != nullptr; + if (success) + packet = tmp_packet; + return success; + } + + void cancel(const String & reason) override + { + if (local_tunnel_sender) + { + local_tunnel_sender->consumerFinish(fmt::format("Receiver cancelled, reason: {}", reason)); + local_tunnel_sender.reset(); + } + } + + grpc::Status finish() override + { + if (local_tunnel_sender) + { + local_tunnel_sender->consumerFinish("Receiver finished!"); + local_tunnel_sender.reset(); + } + return ::grpc::Status::OK; + } +}; + +// TODO remove it in the future +ExchangePacketReaderPtr GRPCReceiverContext::makeReader(const ExchangeRecvRequest & request) const +{ + if (request.is_local) + { + auto [tunnel, status] = establishMPPConnectionLocalUnrefined(request.req.get(), task_manager); + if (!status.ok()) + { + throw Exception("Exchange receiver meet error : " + status.error_message()); + } + return std::make_shared(tunnel->getLocalTunnelSenderUnrefined()); + } + else + { + auto reader = std::make_shared(request); + reader->reader = cluster->rpc_client->sendStreamRequest( + request.req->sender_meta().address(), + &reader->client_context, + *reader->call); + return reader; + } +} + ExchangePacketReaderPtr GRPCReceiverContext::makeSyncReader(const ExchangeRecvRequest & request) const { auto reader = std::make_shared(request); diff --git a/dbms/src/Flash/Mpp/GRPCReceiverContext.h b/dbms/src/Flash/Mpp/GRPCReceiverContext.h index f98c2f73bb5..82c1650b830 100644 --- a/dbms/src/Flash/Mpp/GRPCReceiverContext.h +++ b/dbms/src/Flash/Mpp/GRPCReceiverContext.h @@ -86,6 +86,8 @@ class GRPCReceiverContext bool supportAsync(const ExchangeRecvRequest & request) const; + ExchangePacketReaderPtr makeReader(const ExchangeRecvRequest & request) const; + ExchangePacketReaderPtr makeSyncReader(const ExchangeRecvRequest & request) const; void makeAsyncReader( @@ -103,6 +105,8 @@ class GRPCReceiverContext void establishMPPConnectionLocal(const ExchangeRecvRequest & request, size_t source_index, LocalRequestHandler & local_request_handler, bool is_fine_grained); + std::tuple establishMPPConnectionLocalUnrefined(const ::mpp::EstablishMPPConnectionRequest * request, const std::shared_ptr & task_manager) const; + // Only for tiflash_compute mode, make sure disaggregated_dispatch_reqs is not empty. void sendMPPTaskToTiFlashStorageNode( LoggerPtr log, diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 81988dfd795..1904c40f676 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -207,7 +207,9 @@ void MPPTask::initExchangeReceivers() context->getMaxStreams(), log->identifier(), executor_id, - executor.fine_grained_shuffle_stream_count()); + executor.fine_grained_shuffle_stream_count(), + context->getSettings().enable_refined_local_tunnel); + if (status != RUNNING) throw Exception("exchange receiver map can not be initialized, because the task is not in running state"); diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 1d4dae02858..d72ebafb691 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -392,4 +392,50 @@ void SyncTunnelSender::startSendThread(PacketWriter * writer) sendJob(writer); }); } + +// TODO remove it in the future +void MPPTunnel::connectUnrefinedLocal(PacketWriter * writer) +{ + { + std::unique_lock lk(mu); + if (status != TunnelStatus::Unconnected) + throw Exception(fmt::format("MPPTunnel has connected or finished: {}", statusToString())); + + LOG_TRACE(log, "ready to connect"); + + RUNTIME_ASSERT(writer == nullptr, log); + local_tunnel_sender_unrefined = std::make_shared(queue_size, mem_tracker, log, tunnel_id, &data_size_in_queue); + tunnel_sender = local_tunnel_sender_unrefined; + + status = TunnelStatus::Connected; + cv_for_status_changed.notify_all(); + } + LOG_DEBUG(log, "connected"); +} + +std::shared_ptr LocalTunnelSenderUnrefined::readForLocal() +{ + TrackedMppDataPacketPtr res; + auto result = send_queue.pop(res); + if (result == MPMCQueueResult::OK) + { + MPPTunnelMetric::subDataSizeMetric(*data_size_in_queue, res->getPacket().ByteSizeLong()); + + // switch tunnel's memory tracker into receiver's + res->switchMemTracker(current_memory_tracker); + return res; + } + else if (result == MPMCQueueResult::CANCELLED) + { + RUNTIME_ASSERT(!send_queue.getCancelReason().empty(), "Tunnel sender cancelled without reason"); + if (!cancel_reason_sent) + { + cancel_reason_sent = true; + res = std::make_shared(getPacketWithError(send_queue.getCancelReason()), current_memory_tracker); + return res; + } + } + consumerFinish(""); + return nullptr; +} } // namespace DB diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index 353172dbc87..f7b75463644 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -332,11 +332,45 @@ class LocalTunnelSender : public TunnelSender std::atomic_bool is_done; }; +// TODO remove it in the future +class LocalTunnelSenderUnrefined : public TunnelSender +{ +public: + using Base = TunnelSender; + using Base::Base; + + LocalTunnelSenderUnrefined(size_t queue_size, MemoryTrackerPtr & memory_tracker_, const LoggerPtr & log_, const String & tunnel_id_, std::atomic * data_size_in_queue_) + : TunnelSender(memory_tracker_, log_, tunnel_id_, data_size_in_queue_) + , send_queue(queue_size) {} + + TrackedMppDataPacketPtr readForLocal(); + + bool push(TrackedMppDataPacketPtr && data) override + { + return send_queue.push(std::move(data)) == MPMCQueueResult::OK; + } + + void cancelWith(const String & reason) override + { + send_queue.cancelWith(reason); + } + + bool finish() override + { + return send_queue.finish(); + } + +private: + bool cancel_reason_sent = false; + MPMCQueue send_queue; +}; + using TunnelSenderPtr = std::shared_ptr; using SyncTunnelSenderPtr = std::shared_ptr; using AsyncTunnelSenderPtr = std::shared_ptr; using LocalTunnelSenderPtr = std::shared_ptr>; using LocalTunnelFineGrainedSenderPtr = std::shared_ptr>; +using LocalTunnelSenderUnrefinedPtr = std::shared_ptr; /** * MPPTunnel represents the sender of an exchange connection. @@ -409,6 +443,8 @@ class MPPTunnel : private boost::noncopyable // like `connect` but it's intended to connect async grpc. void connectAsync(IAsyncCallData * data); + void connectUnrefinedLocal(PacketWriter * writer); + // wait until all the data has been transferred. void waitForFinish(); @@ -424,6 +460,7 @@ class MPPTunnel : private boost::noncopyable AsyncTunnelSenderPtr getAsyncTunnelSender() { return async_tunnel_sender; } LocalTunnelSenderPtr getLocalTunnelSender() { return local_tunnel_sender; } LocalTunnelFineGrainedSenderPtr getLocalTunnelFineGrainedSender() { return local_tunnel_fine_grained_sender; } + LocalTunnelSenderUnrefinedPtr getLocalTunnelSenderUnrefined() { return local_tunnel_sender_unrefined; } private: friend class tests::TestMPPTunnel; @@ -474,6 +511,7 @@ class MPPTunnel : private boost::noncopyable AsyncTunnelSenderPtr async_tunnel_sender; LocalTunnelSenderPtr local_tunnel_sender; LocalTunnelFineGrainedSenderPtr local_tunnel_fine_grained_sender; + LocalTunnelSenderUnrefinedPtr local_tunnel_sender_unrefined; std::atomic data_size_in_queue; }; using MPPTunnelPtr = std::shared_ptr; diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 25ee8b9cfde..1f5eb77bf58 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -365,7 +365,8 @@ struct Settings M(SettingUInt64, manual_compact_more_until_ms, 60000, "Continuously compact more segments until reaching specified elapsed time. If 0 is specified, only one segment will be compacted each round.") \ M(SettingUInt64, max_spilled_size_per_spill, 1024ULL * 1024 * 1024, "Max spilled data size per spill, 1GB as the default value.") \ \ - M(SettingBool, enable_planner, true, "Enable planner") + M(SettingBool, enable_planner, true, "Enable planner") \ + M(SettingBool, enable_refined_local_tunnel, true, "Enable refined local tunnel") // clang-format on #define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) TYPE NAME{DEFAULT}; diff --git a/dbms/src/Storages/StorageDisaggregated.cpp b/dbms/src/Storages/StorageDisaggregated.cpp index 6cdb210b45a..9a3febfbe53 100644 --- a/dbms/src/Storages/StorageDisaggregated.cpp +++ b/dbms/src/Storages/StorageDisaggregated.cpp @@ -228,6 +228,7 @@ void StorageDisaggregated::buildReceiverStreams(const std::vectoridentifier(), executor_id, /*fine_grained_shuffle_stream_count=*/0, + context.getSettingsRef().enable_refined_local_tunnel, dispatch_reqs); // MPPTask::receiver_set will record this ExchangeReceiver, so can cancel it in ReceiverSet::cancel(). From 494a0fbc5c0bfc82b36504d525ca39aa6de048c4 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Tue, 31 Jan 2023 14:32:07 +0800 Subject: [PATCH 02/14] merge --- dbms/src/Interpreters/Settings.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index c1f624f5ff2..0959f238746 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -301,7 +301,7 @@ struct Settings \ M(SettingBool, enable_planner, true, "Enable planner") \ M(SettingBool, enable_pipeline, false, "Enable pipeline model") \ - M(SettingUInt64, pipeline_task_thread_pool_size, 0, "The size of task thread pool. 0 means using number_of_logical_cpu_cores.") + M(SettingUInt64, pipeline_task_thread_pool_size, 0, "The size of task thread pool. 0 means using number_of_logical_cpu_cores.") \ M(SettingBool, enable_refined_local_tunnel, true, "Enable refined local tunnel") // clang-format on #define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) TYPE NAME{DEFAULT}; From 121174af0858dcb124f846add9f8266c685c022e Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Wed, 1 Feb 2023 10:52:51 +0800 Subject: [PATCH 03/14] resolve comment --- dbms/src/Debug/dbgQueryExecutor.cpp | 2 +- dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 2 +- dbms/src/Flash/Mpp/GRPCReceiverContext.cpp | 12 +++---- dbms/src/Flash/Mpp/GRPCReceiverContext.h | 4 +-- dbms/src/Flash/Mpp/MPPTask.cpp | 2 +- dbms/src/Flash/Mpp/MPPTunnel.cpp | 14 ++++----- dbms/src/Flash/Mpp/MPPTunnel.h | 33 ++++++++++---------- dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp | 8 ++--- dbms/src/Interpreters/Settings.h | 2 +- dbms/src/Storages/StorageDisaggregated.cpp | 2 +- 10 files changed, 41 insertions(+), 40 deletions(-) diff --git a/dbms/src/Debug/dbgQueryExecutor.cpp b/dbms/src/Debug/dbgQueryExecutor.cpp index 733e1c9edcc..38882024d28 100644 --- a/dbms/src/Debug/dbgQueryExecutor.cpp +++ b/dbms/src/Debug/dbgQueryExecutor.cpp @@ -69,7 +69,7 @@ BlockInputStreamPtr constructExchangeReceiverStream(Context & context, tipb::Exc /*req_id=*/"", /*executor_id=*/"", /*fine_grained_shuffle_stream_count=*/0, - context.getSettings().enable_refined_local_tunnel); + context.getSettings().local_tunnel_version); BlockInputStreamPtr ret = std::make_shared(exchange_receiver, /*req_id=*/"", /*executor_id=*/"", /*stream_id*/ 0); return ret; } diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 17804e5f4bd..f1ea6ca740b 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -475,7 +475,7 @@ void ExchangeReceiverBase::setUpConnection() }, ReceiverChannelWriter(&(getMsgChannels()), req_info, exc_log, getDataSizeInQueue(), ReceiverMode::Local)); - rpc_context->establishMPPConnectionLocal( + rpc_context->establishMPPConnectionLocalV2( req, req.source_index, local_request_handler, diff --git a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp index f3ba0d0182c..6fc95e7b164 100644 --- a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp +++ b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp @@ -275,7 +275,7 @@ bool GRPCReceiverContext::supportAsync(const ExchangeRecvRequest & request) cons return enable_async_grpc && !request.is_local; } -void GRPCReceiverContext::establishMPPConnectionLocal( +void GRPCReceiverContext::establishMPPConnectionLocalV2( const ExchangeRecvRequest & request, size_t source_index, LocalRequestHandler & local_request_handler, @@ -289,7 +289,7 @@ void GRPCReceiverContext::establishMPPConnectionLocal( } // TODO remove it in the future -std::tuple GRPCReceiverContext::establishMPPConnectionLocalUnrefined( +std::tuple GRPCReceiverContext::establishMPPConnectionLocalV1( const ::mpp::EstablishMPPConnectionRequest * request, const std::shared_ptr & task_manager) const { @@ -310,9 +310,9 @@ std::tuple GRPCReceiverContext::establishMPPConnecti // TODO remove it in the future struct LocalExchangePacketReader : public ExchangePacketReader { - LocalTunnelSenderUnrefinedPtr local_tunnel_sender; + LocalTunnelSenderV1Ptr local_tunnel_sender; - explicit LocalExchangePacketReader(const LocalTunnelSenderUnrefinedPtr & local_tunnel_sender_) + explicit LocalExchangePacketReader(const LocalTunnelSenderV1Ptr & local_tunnel_sender_) : local_tunnel_sender(local_tunnel_sender_) {} @@ -362,12 +362,12 @@ ExchangePacketReaderPtr GRPCReceiverContext::makeReader(const ExchangeRecvReques { if (request.is_local) { - auto [tunnel, status] = establishMPPConnectionLocalUnrefined(request.req.get(), task_manager); + auto [tunnel, status] = establishMPPConnectionLocalV1(request.req.get(), task_manager); if (!status.ok()) { throw Exception("Exchange receiver meet error : " + status.error_message()); } - return std::make_shared(tunnel->getLocalTunnelSenderUnrefined()); + return std::make_shared(tunnel->getLocalTunnelSenderV1()); } else { diff --git a/dbms/src/Flash/Mpp/GRPCReceiverContext.h b/dbms/src/Flash/Mpp/GRPCReceiverContext.h index 82c1650b830..6cb45f7c118 100644 --- a/dbms/src/Flash/Mpp/GRPCReceiverContext.h +++ b/dbms/src/Flash/Mpp/GRPCReceiverContext.h @@ -103,9 +103,9 @@ class GRPCReceiverContext void fillSchema(DAGSchema & schema) const; - void establishMPPConnectionLocal(const ExchangeRecvRequest & request, size_t source_index, LocalRequestHandler & local_request_handler, bool is_fine_grained); + void establishMPPConnectionLocalV2(const ExchangeRecvRequest & request, size_t source_index, LocalRequestHandler & local_request_handler, bool is_fine_grained); - std::tuple establishMPPConnectionLocalUnrefined(const ::mpp::EstablishMPPConnectionRequest * request, const std::shared_ptr & task_manager) const; + std::tuple establishMPPConnectionLocalV1(const ::mpp::EstablishMPPConnectionRequest * request, const std::shared_ptr & task_manager) const; // Only for tiflash_compute mode, make sure disaggregated_dispatch_reqs is not empty. void sendMPPTaskToTiFlashStorageNode( diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 1904c40f676..9086d63b2af 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -208,7 +208,7 @@ void MPPTask::initExchangeReceivers() log->identifier(), executor_id, executor.fine_grained_shuffle_stream_count(), - context->getSettings().enable_refined_local_tunnel); + context->getSettings().local_tunnel_version); if (status != RUNNING) throw Exception("exchange receiver map can not be initialized, because the task is not in running state"); diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index d72ebafb691..1db0b031907 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -214,13 +214,13 @@ void MPPTunnel::connectLocal(size_t source_index, LocalRequestHandler & local_re LOG_TRACE(log, "ready to connect local"); if (is_fine_grained) { - local_tunnel_fine_grained_sender = std::make_shared>(source_index, local_request_handler, log, mem_tracker, tunnel_id); - tunnel_sender = local_tunnel_fine_grained_sender; + local_tunnel_fine_grained_sender_v2 = std::make_shared>(source_index, local_request_handler, log, mem_tracker, tunnel_id); + tunnel_sender = local_tunnel_fine_grained_sender_v2; } else { - local_tunnel_sender = std::make_shared>(source_index, local_request_handler, log, mem_tracker, tunnel_id); - tunnel_sender = local_tunnel_sender; + local_tunnel_sender_v2 = std::make_shared>(source_index, local_request_handler, log, mem_tracker, tunnel_id); + tunnel_sender = local_tunnel_sender_v2; } status = TunnelStatus::Connected; @@ -404,8 +404,8 @@ void MPPTunnel::connectUnrefinedLocal(PacketWriter * writer) LOG_TRACE(log, "ready to connect"); RUNTIME_ASSERT(writer == nullptr, log); - local_tunnel_sender_unrefined = std::make_shared(queue_size, mem_tracker, log, tunnel_id, &data_size_in_queue); - tunnel_sender = local_tunnel_sender_unrefined; + local_tunnel_sender_v1 = std::make_shared(queue_size, mem_tracker, log, tunnel_id, &data_size_in_queue); + tunnel_sender = local_tunnel_sender_v1; status = TunnelStatus::Connected; cv_for_status_changed.notify_all(); @@ -413,7 +413,7 @@ void MPPTunnel::connectUnrefinedLocal(PacketWriter * writer) LOG_DEBUG(log, "connected"); } -std::shared_ptr LocalTunnelSenderUnrefined::readForLocal() +std::shared_ptr LocalTunnelSenderV1::readForLocal() { TrackedMppDataPacketPtr res; auto result = send_queue.pop(res); diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index f7b75463644..cfa2c8aae5b 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -250,10 +250,10 @@ class AsyncTunnelSender : public TunnelSender }; template -class LocalTunnelSender : public TunnelSender +class LocalTunnelSenderV2 : public TunnelSender { public: - LocalTunnelSender( + LocalTunnelSenderV2( size_t source_index_, LocalRequestHandler & local_request_handler_, const LoggerPtr & log_, @@ -267,7 +267,7 @@ class LocalTunnelSender : public TunnelSender local_request_handler.setAlive(); } - ~LocalTunnelSender() override + ~LocalTunnelSenderV2() override { RUNTIME_ASSERT(is_done, "Local tunnel is destructed before called by cancel() or finish()"); @@ -333,15 +333,16 @@ class LocalTunnelSender : public TunnelSender }; // TODO remove it in the future -class LocalTunnelSenderUnrefined : public TunnelSender +class LocalTunnelSenderV1 : public TunnelSender { public: using Base = TunnelSender; using Base::Base; - LocalTunnelSenderUnrefined(size_t queue_size, MemoryTrackerPtr & memory_tracker_, const LoggerPtr & log_, const String & tunnel_id_, std::atomic * data_size_in_queue_) - : TunnelSender(memory_tracker_, log_, tunnel_id_, data_size_in_queue_) - , send_queue(queue_size) {} + LocalTunnelSenderV1(size_t queue_size, MemoryTrackerPtr & memory_tracker_, const LoggerPtr & log_, const String & tunnel_id_, std::atomic * data_size_in_queue_) + : TunnelSender(memory_tracker_, log_, tunnel_id_, data_size_in_queue_) + , send_queue(queue_size) + {} TrackedMppDataPacketPtr readForLocal(); @@ -368,9 +369,9 @@ class LocalTunnelSenderUnrefined : public TunnelSender using TunnelSenderPtr = std::shared_ptr; using SyncTunnelSenderPtr = std::shared_ptr; using AsyncTunnelSenderPtr = std::shared_ptr; -using LocalTunnelSenderPtr = std::shared_ptr>; -using LocalTunnelFineGrainedSenderPtr = std::shared_ptr>; -using LocalTunnelSenderUnrefinedPtr = std::shared_ptr; +using LocalTunnelSenderV2Ptr = std::shared_ptr>; +using LocalTunnelFineGrainedSenderV2Ptr = std::shared_ptr>; +using LocalTunnelSenderV1Ptr = std::shared_ptr; /** * MPPTunnel represents the sender of an exchange connection. @@ -458,9 +459,9 @@ class MPPTunnel : private boost::noncopyable TunnelSenderPtr getTunnelSender() { return tunnel_sender; } SyncTunnelSenderPtr getSyncTunnelSender() { return sync_tunnel_sender; } AsyncTunnelSenderPtr getAsyncTunnelSender() { return async_tunnel_sender; } - LocalTunnelSenderPtr getLocalTunnelSender() { return local_tunnel_sender; } - LocalTunnelFineGrainedSenderPtr getLocalTunnelFineGrainedSender() { return local_tunnel_fine_grained_sender; } - LocalTunnelSenderUnrefinedPtr getLocalTunnelSenderUnrefined() { return local_tunnel_sender_unrefined; } + LocalTunnelSenderV1Ptr getLocalTunnelSenderV1() { return local_tunnel_sender_v1; } + LocalTunnelSenderV2Ptr getLocalTunnelSender() { return local_tunnel_sender_v2; } + LocalTunnelFineGrainedSenderV2Ptr getLocalTunnelFineGrainedSender() { return local_tunnel_fine_grained_sender_v2; } private: friend class tests::TestMPPTunnel; @@ -509,9 +510,9 @@ class MPPTunnel : private boost::noncopyable // According to mode value, among the sync/async/local_tunnel_senders, only the responding sender is not null and do actual work SyncTunnelSenderPtr sync_tunnel_sender; AsyncTunnelSenderPtr async_tunnel_sender; - LocalTunnelSenderPtr local_tunnel_sender; - LocalTunnelFineGrainedSenderPtr local_tunnel_fine_grained_sender; - LocalTunnelSenderUnrefinedPtr local_tunnel_sender_unrefined; + LocalTunnelSenderV1Ptr local_tunnel_sender_v1; + LocalTunnelSenderV2Ptr local_tunnel_sender_v2; + LocalTunnelFineGrainedSenderV2Ptr local_tunnel_fine_grained_sender_v2; std::atomic data_size_in_queue; }; using MPPTunnelPtr = std::shared_ptr; diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index e07ab938c07..1bde409bc9f 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -281,10 +281,10 @@ class TestMPPTunnel : public testing::Test static void setTunnelFinished(MPPTunnelPtr tunnel) { tunnel->status = MPPTunnel::TunnelStatus::Finished; - if (tunnel->local_tunnel_sender) - tunnel->local_tunnel_sender->is_done.store(true); - else if (tunnel->local_tunnel_fine_grained_sender) - tunnel->local_tunnel_fine_grained_sender->is_done.store(true); + if (tunnel->local_tunnel_sender_v2) + tunnel->local_tunnel_sender_v2->is_done.store(true); + else if (tunnel->local_tunnel_fine_grained_sender_v2) + tunnel->local_tunnel_fine_grained_sender_v2->is_done.store(true); } static bool getTunnelConnectedFlag(MPPTunnelPtr tunnel) diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 0959f238746..64985731d2d 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -302,7 +302,7 @@ struct Settings M(SettingBool, enable_planner, true, "Enable planner") \ M(SettingBool, enable_pipeline, false, "Enable pipeline model") \ M(SettingUInt64, pipeline_task_thread_pool_size, 0, "The size of task thread pool. 0 means using number_of_logical_cpu_cores.") \ - M(SettingBool, enable_refined_local_tunnel, true, "Enable refined local tunnel") + M(SettingBool, local_tunnel_version, 1, "1: not refined, 2: refined") // clang-format on #define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) TYPE NAME{DEFAULT}; diff --git a/dbms/src/Storages/StorageDisaggregated.cpp b/dbms/src/Storages/StorageDisaggregated.cpp index 9ac0a01feac..0f4ac2bec7f 100644 --- a/dbms/src/Storages/StorageDisaggregated.cpp +++ b/dbms/src/Storages/StorageDisaggregated.cpp @@ -228,7 +228,7 @@ void StorageDisaggregated::buildReceiverStreams(const std::vectoridentifier(), executor_id, /*fine_grained_shuffle_stream_count=*/0, - context.getSettingsRef().enable_refined_local_tunnel, + context.getSettingsRef().local_tunnel_version, dispatch_reqs); // MPPTask::receiver_set will record this ExchangeReceiver, so can cancel it in ReceiverSet::cancel(). From 5e978b5ca154b1e9074566e74e0c0f253c82f17d Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Thu, 2 Feb 2023 11:17:19 +0800 Subject: [PATCH 04/14] fix --- dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 62 +++++++++++++------------ 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 9b8466691a0..4f47a8e0c80 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -454,39 +454,40 @@ void ExchangeReceiverBase::setUpConnection() async_requests.push_back(std::move(req)); else if (req.is_local) { - String req_info = fmt::format("tunnel{}+{}", req.send_task_id, req.recv_task_id); - - LocalRequestHandler local_request_handler( - getMemoryTracker(), - [this](bool meet_error, const String & local_err_msg) { - this->connectionDone(meet_error, local_err_msg, exc_log); - }, - [this]() { - this->connectionLocalDone(); - }, - [this]() { - this->addLocalConnectionNum(); - }, - ReceiverChannelWriter(&(getMsgChannels()), req_info, exc_log, getDataSizeInQueue(), ReceiverMode::Local)); - - rpc_context->establishMPPConnectionLocal( - req, - req.source_index, - local_request_handler, - enable_fine_grained_shuffle_flag); - --connection_uncreated_num; + if (enable_refined_local) + { + LOG_INFO(exc_log, "refined local tunnel is enabled"); + String req_info = fmt::format("tunnel{}+{}", req.send_task_id, req.recv_task_id); + + LocalRequestHandler local_request_handler( + getMemoryTracker(), + [this](bool meet_error, const String & local_err_msg) { + this->connectionDone(meet_error, local_err_msg, exc_log); + }, + [this]() { + this->connectionLocalDone(); + }, + [this]() { + this->addLocalConnectionNum(); + }, + ReceiverChannelWriter(&(getMsgChannels()), req_info, exc_log, getDataSizeInQueue(), ReceiverMode::Local)); + + rpc_context->establishMPPConnectionLocalV2( + req, + req.source_index, + local_request_handler, + enable_fine_grained_shuffle_flag); + --connection_uncreated_num; + } + else + { + LOG_INFO(exc_log, "refined local tunnel is disabled"); + setUpConnectionWithReadLoop(std::move(req)); + } } else { - thread_manager->schedule(true, "Receiver", [this, req = std::move(req)] { - if (enable_fine_grained_shuffle_flag) - readLoop(req); - else - readLoop(req); - }); - - ++thread_count; - --connection_uncreated_num; + setUpConnectionWithReadLoop(std::move(req)); } } @@ -517,6 +518,7 @@ void ExchangeReceiverBase::setUpConnectionWithReadLoop(ExchangeRecvR }); ++thread_count; + --connection_uncreated_num; } template From 9862130d291b28a211bb9f20884358f6a1412946 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Thu, 2 Feb 2023 11:27:40 +0800 Subject: [PATCH 05/14] fix --- dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 15 +++++++-------- dbms/src/Flash/Mpp/ExchangeReceiver.h | 4 ++-- dbms/src/Server/Server.cpp | 5 +++++ 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 4f47a8e0c80..7d1cfd7c620 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -308,7 +308,7 @@ ExchangeReceiverBase::ExchangeReceiverBase( const String & req_id, const String & executor_id, uint64_t fine_grained_shuffle_stream_count_, - bool enable_refined_local_, + Int32 local_tunnel_version_, const std::vector & disaggregated_dispatch_reqs_) : rpc_context(std::move(rpc_context_)) , source_num(source_num_) @@ -322,7 +322,7 @@ ExchangeReceiverBase::ExchangeReceiverBase( , state(ExchangeReceiverState::NORMAL) , exc_log(Logger::get(req_id, executor_id)) , collected(false) - , enable_refined_local(enable_refined_local_) + , local_tunnel_version(local_tunnel_version_) , data_size_in_queue(0) , disaggregated_dispatch_reqs(disaggregated_dispatch_reqs_) { @@ -454,7 +454,11 @@ void ExchangeReceiverBase::setUpConnection() async_requests.push_back(std::move(req)); else if (req.is_local) { - if (enable_refined_local) + if (local_tunnel_version == 1) + { + setUpConnectionWithReadLoop(std::move(req)); + } + else { LOG_INFO(exc_log, "refined local tunnel is enabled"); String req_info = fmt::format("tunnel{}+{}", req.send_task_id, req.recv_task_id); @@ -479,11 +483,6 @@ void ExchangeReceiverBase::setUpConnection() enable_fine_grained_shuffle_flag); --connection_uncreated_num; } - else - { - LOG_INFO(exc_log, "refined local tunnel is disabled"); - setUpConnectionWithReadLoop(std::move(req)); - } } else { diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.h b/dbms/src/Flash/Mpp/ExchangeReceiver.h index 7b5b666225c..7e8f243d29d 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.h +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.h @@ -97,7 +97,7 @@ class ExchangeReceiverBase const String & req_id, const String & executor_id, uint64_t fine_grained_shuffle_stream_count, - bool enable_refined_local_, + Int32 local_tunnel_version_, const std::vector & disaggregated_dispatch_reqs_ = {}); ~ExchangeReceiverBase(); @@ -200,7 +200,7 @@ class ExchangeReceiverBase bool collected = false; int thread_count = 0; - bool enable_refined_local; + Int32 local_tunnel_version; std::atomic data_size_in_queue; diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 4dac558477b..432fb983d42 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1174,6 +1174,11 @@ int Server::main(const std::vector & /*args*/) size_t delta_index_cache_size = config().getUInt64("delta_index_cache_size", 0); global_context->setDeltaIndexManager(delta_index_cache_size); + if (global_context->getSettings().local_tunnel_version == 1) + LOG_INFO(log, "enable local tunnel version 1"); + else + LOG_INFO(log, "enable local tunnel version 2"); + /// Set path for format schema files auto format_schema_path = Poco::File(config().getString("format_schema_path", path + "format_schemas/")); global_context->setFormatSchemaPath(format_schema_path.path() + "/"); From bd17938c00e490bd06568f19a20985df070ce118 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Thu, 2 Feb 2023 14:33:45 +0800 Subject: [PATCH 06/14] delete useless log --- dbms/src/Server/Server.cpp | 5 ----- 1 file changed, 5 deletions(-) diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 432fb983d42..4dac558477b 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1174,11 +1174,6 @@ int Server::main(const std::vector & /*args*/) size_t delta_index_cache_size = config().getUInt64("delta_index_cache_size", 0); global_context->setDeltaIndexManager(delta_index_cache_size); - if (global_context->getSettings().local_tunnel_version == 1) - LOG_INFO(log, "enable local tunnel version 1"); - else - LOG_INFO(log, "enable local tunnel version 2"); - /// Set path for format schema files auto format_schema_path = Poco::File(config().getString("format_schema_path", path + "format_schemas/")); global_context->setFormatSchemaPath(format_schema_path.path() + "/"); From d7358a551174901d7197476b022204f165e17e1e Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Thu, 2 Feb 2023 20:13:43 +0800 Subject: [PATCH 07/14] fix --- dbms/src/Interpreters/Settings.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 380e93059b2..13e71e81a20 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -303,7 +303,7 @@ struct Settings M(SettingBool, enable_planner, true, "Enable planner") \ M(SettingBool, enable_pipeline, false, "Enable pipeline model") \ M(SettingUInt64, pipeline_task_thread_pool_size, 0, "The size of task thread pool. 0 means using number_of_logical_cpu_cores.") \ - M(SettingBool, local_tunnel_version, 1, "1: not refined, 2: refined") + M(SettingUInt64, local_tunnel_version, 1, "1: not refined, 2: refined") // clang-format on #define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) TYPE NAME{DEFAULT}; From cf1807b1b26f6a5aa7d7381de12a10ef630ba2aa Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 6 Feb 2023 15:20:25 +0800 Subject: [PATCH 08/14] update --- dbms/src/Interpreters/Settings.h | 5 ----- 1 file changed, 5 deletions(-) diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index b88546a8bb5..13e71e81a20 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -302,13 +302,8 @@ struct Settings \ M(SettingBool, enable_planner, true, "Enable planner") \ M(SettingBool, enable_pipeline, false, "Enable pipeline model") \ -<<<<<<< HEAD - M(SettingUInt64, pipeline_task_thread_pool_size, 0, "The size of task thread pool. 0 means using number_of_logical_cpu_cores.") \ - M(SettingBool, enable_refined_local_tunnel, true, "Enable refined local tunnel") -======= M(SettingUInt64, pipeline_task_thread_pool_size, 0, "The size of task thread pool. 0 means using number_of_logical_cpu_cores.") \ M(SettingUInt64, local_tunnel_version, 1, "1: not refined, 2: refined") ->>>>>>> d7358a551174901d7197476b022204f165e17e1e // clang-format on #define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) TYPE NAME{DEFAULT}; From 310b11dda41f3aab30217098c9b8f3973679895c Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 6 Feb 2023 15:30:28 +0800 Subject: [PATCH 09/14] update --- dbms/src/Flash/Mpp/GRPCReceiverContext.cpp | 4 ++-- dbms/src/Flash/Mpp/MPPTunnel.cpp | 12 ++++++------ dbms/src/Flash/Mpp/MPPTunnel.h | 4 ++-- dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp | 6 +++--- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp index 6fc95e7b164..c0fcbcefd00 100644 --- a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp +++ b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp @@ -285,7 +285,7 @@ void GRPCReceiverContext::establishMPPConnectionLocalV2( auto [tunnel, err_msg] = task_manager->findTunnelWithTimeout(request.req.get(), std::chrono::seconds(10)); checkLocalTunnel(tunnel, err_msg); - tunnel->connectLocal(source_index, local_request_handler, is_fine_grained); + tunnel->connectLocalV2(source_index, local_request_handler, is_fine_grained); } // TODO remove it in the future @@ -303,7 +303,7 @@ std::tuple GRPCReceiverContext::establishMPPConnecti { return std::make_tuple(nullptr, grpc::Status(grpc::StatusCode::INTERNAL, "EstablishMPPConnectionLocal into a remote channel!")); } - tunnel->connectUnrefinedLocal(nullptr); + tunnel->connectLocalV1(nullptr); return std::make_tuple(tunnel, grpc::Status::OK); } diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 1db0b031907..5a56f47368e 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -204,14 +204,14 @@ void MPPTunnel::connectSync(PacketWriter * writer) LOG_DEBUG(log, "Sync tunnel connected"); } -void MPPTunnel::connectLocal(size_t source_index, LocalRequestHandler & local_request_handler, bool is_fine_grained) +void MPPTunnel::connectLocalV2(size_t source_index, LocalRequestHandler & local_request_handler, bool is_fine_grained) { { std::unique_lock lk(mu); RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, fmt::format("MPPTunnel has connected or finished: {}", statusToString())); RUNTIME_CHECK_MSG(mode == TunnelSenderMode::LOCAL, "This should be a local tunnel"); - LOG_TRACE(log, "ready to connect local"); + LOG_TRACE(log, "ready to connect local tunnel version 2"); if (is_fine_grained) { local_tunnel_fine_grained_sender_v2 = std::make_shared>(source_index, local_request_handler, log, mem_tracker, tunnel_id); @@ -226,7 +226,7 @@ void MPPTunnel::connectLocal(size_t source_index, LocalRequestHandler & local_re status = TunnelStatus::Connected; cv_for_status_changed.notify_all(); } - LOG_DEBUG(log, "Local tunnel connected"); + LOG_DEBUG(log, "Local tunnel version 2 is connected"); } void MPPTunnel::connectAsync(IAsyncCallData * call_data) @@ -394,14 +394,14 @@ void SyncTunnelSender::startSendThread(PacketWriter * writer) } // TODO remove it in the future -void MPPTunnel::connectUnrefinedLocal(PacketWriter * writer) +void MPPTunnel::connectLocalV1(PacketWriter * writer) { { std::unique_lock lk(mu); if (status != TunnelStatus::Unconnected) throw Exception(fmt::format("MPPTunnel has connected or finished: {}", statusToString())); - LOG_TRACE(log, "ready to connect"); + LOG_TRACE(log, "ready to connect local tunnel version 1"); RUNTIME_ASSERT(writer == nullptr, log); local_tunnel_sender_v1 = std::make_shared(queue_size, mem_tracker, log, tunnel_id, &data_size_in_queue); @@ -410,7 +410,7 @@ void MPPTunnel::connectUnrefinedLocal(PacketWriter * writer) status = TunnelStatus::Connected; cv_for_status_changed.notify_all(); } - LOG_DEBUG(log, "connected"); + LOG_DEBUG(log, "Local tunnel version 1 is connected"); } std::shared_ptr LocalTunnelSenderV1::readForLocal() diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index cfa2c8aae5b..9fb63620e45 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -439,12 +439,12 @@ class MPPTunnel : private boost::noncopyable // a MPPConn request has arrived. it will build connection by this tunnel; void connectSync(PacketWriter * writer); - void connectLocal(size_t source_index, LocalRequestHandler & local_request_handler, bool is_fine_grained); + void connectLocalV2(size_t source_index, LocalRequestHandler & local_request_handler, bool is_fine_grained); // like `connect` but it's intended to connect async grpc. void connectAsync(IAsyncCallData * data); - void connectUnrefinedLocal(PacketWriter * writer); + void connectLocalV1(PacketWriter * writer); // wait until all the data has been transferred. void waitForFinish(); diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index 1bde409bc9f..5fa98e93a26 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -199,7 +199,7 @@ class MockExchangeReceiver }, []() {}, ReceiverChannelWriter(&msg_channels, "", log, &data_size_in_queue, ReceiverMode::Local)); - tunnel->connectLocal(0, local_request_handler, false); + tunnel->connectLocalV2(0, local_request_handler, false); } } @@ -655,7 +655,7 @@ try []() {}, []() {}, ReceiverChannelWriter(nullptr, "", Logger::get(), nullptr, ReceiverMode::Local)); - tunnels[0]->connectLocal(0, local_req_handler, false); + tunnels[0]->connectLocalV2(0, local_req_handler, false); GTEST_FAIL(); } catch (Exception & e) @@ -674,7 +674,7 @@ try []() {}, []() {}, ReceiverChannelWriter(nullptr, "", Logger::get(), nullptr, ReceiverMode::Local)); - tunnels[0]->connectLocal(0, local_req_handler, false); + tunnels[0]->connectLocalV2(0, local_req_handler, false); GTEST_FAIL(); } catch (Exception & e) From 111eb876e04757298e12500be6c6544ba4de7d82 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 6 Feb 2023 16:27:14 +0800 Subject: [PATCH 10/14] fix --- dbms/src/Flash/Mpp/GRPCReceiverContext.cpp | 2 +- dbms/src/Flash/Mpp/GRPCReceiverContext.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp index c0fcbcefd00..a5289edf876 100644 --- a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp +++ b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp @@ -291,7 +291,7 @@ void GRPCReceiverContext::establishMPPConnectionLocalV2( // TODO remove it in the future std::tuple GRPCReceiverContext::establishMPPConnectionLocalV1( const ::mpp::EstablishMPPConnectionRequest * request, - const std::shared_ptr & task_manager) const + const std::shared_ptr & task_manager) { std::chrono::seconds timeout(10); auto [tunnel, err_msg] = task_manager->findTunnelWithTimeout(request, timeout); diff --git a/dbms/src/Flash/Mpp/GRPCReceiverContext.h b/dbms/src/Flash/Mpp/GRPCReceiverContext.h index 6cb45f7c118..28a187b614d 100644 --- a/dbms/src/Flash/Mpp/GRPCReceiverContext.h +++ b/dbms/src/Flash/Mpp/GRPCReceiverContext.h @@ -105,7 +105,7 @@ class GRPCReceiverContext void establishMPPConnectionLocalV2(const ExchangeRecvRequest & request, size_t source_index, LocalRequestHandler & local_request_handler, bool is_fine_grained); - std::tuple establishMPPConnectionLocalV1(const ::mpp::EstablishMPPConnectionRequest * request, const std::shared_ptr & task_manager) const; + static std::tuple establishMPPConnectionLocalV1(const ::mpp::EstablishMPPConnectionRequest * request, const std::shared_ptr & task_manager); // Only for tiflash_compute mode, make sure disaggregated_dispatch_reqs is not empty. void sendMPPTaskToTiFlashStorageNode( From f2ed2e8059fdb2c58639e63831c5a9e8f09770b5 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 6 Feb 2023 16:38:59 +0800 Subject: [PATCH 11/14] format --- dbms/src/Flash/Mpp/GRPCReceiverContext.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp index a5289edf876..985180a9ae1 100644 --- a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp +++ b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp @@ -291,7 +291,7 @@ void GRPCReceiverContext::establishMPPConnectionLocalV2( // TODO remove it in the future std::tuple GRPCReceiverContext::establishMPPConnectionLocalV1( const ::mpp::EstablishMPPConnectionRequest * request, - const std::shared_ptr & task_manager) + const std::shared_ptr & task_manager) { std::chrono::seconds timeout(10); auto [tunnel, err_msg] = task_manager->findTunnelWithTimeout(request, timeout); From 841a45a4ab5ecd7ee6749845c6c4099c50f64121 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Tue, 7 Feb 2023 13:28:55 +0800 Subject: [PATCH 12/14] fix ut --- .../tests/gtest_ti_remote_block_inputstream.cpp | 15 +++++++++++++-- dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 2 +- dbms/src/Flash/Mpp/ExchangeReceiver.h | 2 +- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp index 5fe4a7d8c80..6c88c6e9d10 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp @@ -26,11 +26,13 @@ #include #include #include +#include #include #include #include #include +#include namespace DB @@ -247,7 +249,15 @@ struct MockReceiverContext grpc::CompletionQueue *, UnaryCallback *) const {} - void establishMPPConnectionLocal(const MockReceiverContext::Request &, size_t, LocalRequestHandler &, bool) {} + static ExchangePacketReaderPtr makeReader(const Request &) { return nullptr; } + + static std::tuple establishMPPConnectionLocalV1(const ::mpp::EstablishMPPConnectionRequest *, const std::shared_ptr &) + { + // Useless, just for compilation + return std::make_pair(MPPTunnelPtr(), grpc::Status::CANCELLED); + } + + void establishMPPConnectionLocalV2(const Request &, size_t, LocalRequestHandler &, bool) {} PacketQueuePtr queue; std::vector field_types{}; @@ -440,7 +450,8 @@ class TestTiRemoteBlockInputStream : public testing::Test 1, "mock_req_id", "mock_exchange_receiver_id", - 0); + 0, + 1); auto receiver_stream = std::make_shared( receiver, "mock_req_id", diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 7d1cfd7c620..6e254bd62dc 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -507,7 +507,7 @@ void ExchangeReceiverBase::setUpConnection() } template -void ExchangeReceiverBase::setUpConnectionWithReadLoop(ExchangeRecvRequest && req) +void ExchangeReceiverBase::setUpConnectionWithReadLoop(Request && req) { thread_manager->schedule(true, "Receiver", [this, req = std::move(req)] { if (enable_fine_grained_shuffle_flag) diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.h b/dbms/src/Flash/Mpp/ExchangeReceiver.h index 7e8f243d29d..8cd72e1c4cb 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.h +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.h @@ -165,7 +165,7 @@ class ExchangeReceiverBase void connectionLocalDone(); void handleConnectionAfterException(); - void setUpConnectionWithReadLoop(ExchangeRecvRequest && req); + void setUpConnectionWithReadLoop(Request && req); bool isReceiverForTiFlashStorage() { From b5728c7839270b4565af19b955cc87e6177eaed2 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Tue, 7 Feb 2023 15:23:41 +0800 Subject: [PATCH 13/14] fix ut --- .../Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp index 6c88c6e9d10..b911ca5855e 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp @@ -451,7 +451,7 @@ class TestTiRemoteBlockInputStream : public testing::Test "mock_req_id", "mock_exchange_receiver_id", 0, - 1); + 2); auto receiver_stream = std::make_shared( receiver, "mock_req_id", From 5102a1650aa42904bc37688cfdc5dac830e9ed92 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Tue, 7 Feb 2023 16:45:48 +0800 Subject: [PATCH 14/14] fix ut --- .../Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp index b911ca5855e..d6d1097d720 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp @@ -249,7 +249,10 @@ struct MockReceiverContext grpc::CompletionQueue *, UnaryCallback *) const {} - static ExchangePacketReaderPtr makeReader(const Request &) { return nullptr; } + std::shared_ptr makeReader(const Request &) + { + return std::make_shared(queue); + } static std::tuple establishMPPConnectionLocalV1(const ::mpp::EstablishMPPConnectionRequest *, const std::shared_ptr &) {