Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add switch for disabling refined local tunnel #6683

Merged
merged 31 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
098658f
init
xzhangxian1008 Jan 29, 2023
5a6584b
merge
xzhangxian1008 Jan 31, 2023
d516427
merge
xzhangxian1008 Jan 31, 2023
494a0fb
merge
xzhangxian1008 Jan 31, 2023
121174a
resolve comment
xzhangxian1008 Feb 1, 2023
53c9626
Merge branch 'master' of ssh://github.com/pingcap/tiflash into switch…
xzhangxian1008 Feb 1, 2023
909e982
Merge branch 'master' of ssh://github.com/pingcap/tiflash into switch…
xzhangxian1008 Feb 2, 2023
075a3d1
merge
xzhangxian1008 Feb 2, 2023
5e978b5
fix
xzhangxian1008 Feb 2, 2023
9862130
fix
xzhangxian1008 Feb 2, 2023
bd17938
delete useless log
xzhangxian1008 Feb 2, 2023
d7358a5
fix
xzhangxian1008 Feb 2, 2023
c9eb9d2
Merge branch 'master' of ssh://github.com/pingcap/tiflash into switch…
xzhangxian1008 Feb 3, 2023
ed505d6
merge
xzhangxian1008 Feb 6, 2023
cf1807b
update
xzhangxian1008 Feb 6, 2023
1b5669d
Merge branch 'master' of https://github.com/pingcap/tiflash into swit…
xzhangxian1008 Feb 6, 2023
310b11d
update
xzhangxian1008 Feb 6, 2023
111eb87
fix
xzhangxian1008 Feb 6, 2023
f2ed2e8
format
xzhangxian1008 Feb 6, 2023
fac66cb
Merge branch 'master' of https://github.com/pingcap/tiflash into swit…
xzhangxian1008 Feb 6, 2023
d7a1a38
Merge branch 'master' into switch_rf_local
ti-chi-bot Feb 7, 2023
841a45a
fix ut
xzhangxian1008 Feb 7, 2023
5464159
Merge branch 'switch_rf_local' of github.com:xzhangxian1008/tiflash i…
xzhangxian1008 Feb 7, 2023
19ade5b
Merge branch 'master' into switch_rf_local
ti-chi-bot Feb 7, 2023
3ec48be
Merge branch 'master' into switch_rf_local
ti-chi-bot Feb 7, 2023
b5728c7
fix ut
xzhangxian1008 Feb 7, 2023
7b3de3c
Merge branch 'switch_rf_local' of github.com:xzhangxian1008/tiflash i…
xzhangxian1008 Feb 7, 2023
153a3b6
Merge branch 'switch_rf_local' of github.com:xzhangxian1008/tiflash i…
xzhangxian1008 Feb 7, 2023
5102a16
fix ut
xzhangxian1008 Feb 7, 2023
f3f36bc
Merge branch 'master' into switch_rf_local
ti-chi-bot Feb 7, 2023
ce8683b
Merge branch 'master' into switch_rf_local
ti-chi-bot Feb 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dbms/src/Debug/dbgQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ BlockInputStreamPtr constructExchangeReceiverStream(Context & context, tipb::Exc
10,
/*req_id=*/"",
/*executor_id=*/"",
/*fine_grained_shuffle_stream_count=*/0);
/*fine_grained_shuffle_stream_count=*/0,
context.getSettings().enable_refined_local_tunnel);
BlockInputStreamPtr ret = std::make_shared<ExchangeReceiverInputStream>(exchange_receiver, /*req_id=*/"", /*executor_id=*/"", /*stream_id*/ 0);
return ret;
}
Expand Down
78 changes: 48 additions & 30 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ ExchangeReceiverBase<RPCContext>::ExchangeReceiverBase(
const String & req_id,
const String & executor_id,
uint64_t fine_grained_shuffle_stream_count_,
bool enable_refined_local_,
const std::vector<StorageDisaggregated::RequestAndRegionIDs> & disaggregated_dispatch_reqs_)
: rpc_context(std::move(rpc_context_))
, source_num(source_num_)
Expand All @@ -320,6 +321,7 @@ ExchangeReceiverBase<RPCContext>::ExchangeReceiverBase(
, state(ExchangeReceiverState::NORMAL)
, exc_log(Logger::get(req_id, executor_id))
, collected(false)
, enable_refined_local(enable_refined_local_)
, data_size_in_queue(0)
, disaggregated_dispatch_reqs(disaggregated_dispatch_reqs_)
{
Expand Down Expand Up @@ -455,37 +457,39 @@ void ExchangeReceiverBase<RPCContext>::setUpConnection()
async_requests.push_back(std::move(req));
else if (req.is_local)
{
String req_info = fmt::format("tunnel{}+{}", req.send_task_id, req.recv_task_id);

LocalRequestHandler local_request_handler(
getMemoryTracker(),
[this](bool meet_error, const String & local_err_msg) {
this->connectionDone(meet_error, local_err_msg, exc_log);
},
[this]() {
this->connectionLocalDone();
},
[this]() {
this->addLocalConnectionNum();
},
ReceiverChannelWriter(&(getMsgChannels()), req_info, exc_log, getDataSizeInQueue(), ReceiverMode::Local));

rpc_context->establishMPPConnectionLocal(
req,
req.source_index,
local_request_handler,
enable_fine_grained_shuffle_flag);
if (enable_refined_local)
{
LOG_INFO(exc_log, "refined local tunnel is enabled");
String req_info = fmt::format("tunnel{}+{}", req.send_task_id, req.recv_task_id);

LocalRequestHandler local_request_handler(
getMemoryTracker(),
[this](bool meet_error, const String & local_err_msg) {
this->connectionDone(meet_error, local_err_msg, exc_log);
},
[this]() {
this->connectionLocalDone();
},
[this]() {
this->addLocalConnectionNum();
},
ReceiverChannelWriter(&(getMsgChannels()), req_info, exc_log, getDataSizeInQueue(), ReceiverMode::Local));

rpc_context->establishMPPConnectionLocal(
req,
req.source_index,
local_request_handler,
enable_fine_grained_shuffle_flag);
}
else
{
LOG_INFO(exc_log, "refined local tunnel is disabled");
setUpConnectionWithReadLoop(std::move(req));
}
}
else
{
thread_manager->schedule(true, "Receiver", [this, req = std::move(req)] {
if (enable_fine_grained_shuffle_flag)
readLoop<true>(req);
else
readLoop<false>(req);
});

++thread_count;
setUpConnectionWithReadLoop(std::move(req));
}
}

Expand All @@ -503,6 +507,19 @@ void ExchangeReceiverBase<RPCContext>::setUpConnection()
}
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::setUpConnectionWithReadLoop(ExchangeRecvRequest && req)
{
thread_manager->schedule(true, "Receiver", [this, req = std::move(req)] {
if (enable_fine_grained_shuffle_flag)
readLoop<true>(req);
else
readLoop<false>(req);
});

++thread_count;
}

template <typename RPCContext>
template <bool enable_fine_grained_shuffle>
void ExchangeReceiverBase<RPCContext>::reactor(const std::vector<Request> & async_requests)
Expand Down Expand Up @@ -567,10 +584,11 @@ void ExchangeReceiverBase<RPCContext>::readLoop(const Request & req)
try
{
auto status = RPCContext::getStatusOK();
ReceiverChannelWriter channel_writer(&msg_channels, req_info, log, &data_size_in_queue, ReceiverMode::Sync);
ReceiverMode recv_mode = req.is_local ? ReceiverMode::Local : ReceiverMode::Sync;
ReceiverChannelWriter channel_writer(&msg_channels, req_info, log, &data_size_in_queue, recv_mode);
for (int i = 0; i < max_retry_times; ++i)
{
auto reader = rpc_context->makeSyncReader(req);
auto reader = rpc_context->makeReader(req);
bool has_data = false;
for (;;)
{
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/Mpp/ExchangeReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class ExchangeReceiverBase
const String & req_id,
const String & executor_id,
uint64_t fine_grained_shuffle_stream_count,
bool enable_refined_local_,
const std::vector<StorageDisaggregated::RequestAndRegionIDs> & disaggregated_dispatch_reqs_ = {});

~ExchangeReceiverBase();
Expand Down Expand Up @@ -166,6 +167,8 @@ class ExchangeReceiverBase

void connectionLocalDone();

void setUpConnectionWithReadLoop(ExchangeRecvRequest && req);

bool isReceiverForTiFlashStorage()
{
// If not empty, need to send MPPTask to tiflash_storage.
Expand Down Expand Up @@ -198,6 +201,7 @@ class ExchangeReceiverBase

bool collected = false;
int thread_count = 0;
bool enable_refined_local;

std::atomic<Int64> data_size_in_queue;

Expand Down
92 changes: 92 additions & 0 deletions dbms/src/Flash/Mpp/GRPCReceiverContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,98 @@ void GRPCReceiverContext::establishMPPConnectionLocal(
tunnel->connectLocal(source_index, local_request_handler, is_fine_grained);
}

// TODO remove it in the future
std::tuple<MPPTunnelPtr, grpc::Status> GRPCReceiverContext::establishMPPConnectionLocalUnrefined(
const ::mpp::EstablishMPPConnectionRequest * request,
const std::shared_ptr<MPPTaskManager> & task_manager) const
{
std::chrono::seconds timeout(10);
auto [tunnel, err_msg] = task_manager->findTunnelWithTimeout(request, timeout);
if (tunnel == nullptr)
{
return std::make_tuple(tunnel, grpc::Status(grpc::StatusCode::INTERNAL, err_msg));
}
if (!tunnel->isLocal())
{
return std::make_tuple(nullptr, grpc::Status(grpc::StatusCode::INTERNAL, "EstablishMPPConnectionLocal into a remote channel!"));
}
tunnel->connectUnrefinedLocal(nullptr);
return std::make_tuple(tunnel, grpc::Status::OK);
}

// TODO remove it in the future
struct LocalExchangePacketReader : public ExchangePacketReader
{
LocalTunnelSenderUnrefinedPtr local_tunnel_sender;

explicit LocalExchangePacketReader(const LocalTunnelSenderUnrefinedPtr & local_tunnel_sender_)
: local_tunnel_sender(local_tunnel_sender_)
{}

/// put the implementation of dtor in .cpp so we don't need to put the specialization of
/// pingcap::kv::RpcCall<mpp::EstablishMPPConnectionRequest> in header file.
~LocalExchangePacketReader() override
{
if (local_tunnel_sender)
{
// In case that ExchangeReceiver throw error before finish reading from mpp_tunnel
local_tunnel_sender->consumerFinish("Receiver exists");
local_tunnel_sender.reset();
}
}

bool read(TrackedMppDataPacketPtr & packet) override
{
TrackedMppDataPacketPtr tmp_packet = local_tunnel_sender->readForLocal();
bool success = tmp_packet != nullptr;
if (success)
packet = tmp_packet;
return success;
}

void cancel(const String & reason) override
{
if (local_tunnel_sender)
{
local_tunnel_sender->consumerFinish(fmt::format("Receiver cancelled, reason: {}", reason));
local_tunnel_sender.reset();
}
}

grpc::Status finish() override
{
if (local_tunnel_sender)
{
local_tunnel_sender->consumerFinish("Receiver finished!");
local_tunnel_sender.reset();
}
return ::grpc::Status::OK;
}
};

// TODO remove it in the future
ExchangePacketReaderPtr GRPCReceiverContext::makeReader(const ExchangeRecvRequest & request) const
{
if (request.is_local)
{
auto [tunnel, status] = establishMPPConnectionLocalUnrefined(request.req.get(), task_manager);
if (!status.ok())
{
throw Exception("Exchange receiver meet error : " + status.error_message());
}
return std::make_shared<LocalExchangePacketReader>(tunnel->getLocalTunnelSenderUnrefined());
}
else
{
auto reader = std::make_shared<GrpcExchangePacketReader>(request);
reader->reader = cluster->rpc_client->sendStreamRequest(
request.req->sender_meta().address(),
&reader->client_context,
*reader->call);
return reader;
}
}

ExchangePacketReaderPtr GRPCReceiverContext::makeSyncReader(const ExchangeRecvRequest & request) const
{
auto reader = std::make_shared<GrpcExchangePacketReader>(request);
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/Mpp/GRPCReceiverContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class GRPCReceiverContext

bool supportAsync(const ExchangeRecvRequest & request) const;

ExchangePacketReaderPtr makeReader(const ExchangeRecvRequest & request) const;

ExchangePacketReaderPtr makeSyncReader(const ExchangeRecvRequest & request) const;

void makeAsyncReader(
Expand All @@ -103,6 +105,8 @@ class GRPCReceiverContext

void establishMPPConnectionLocal(const ExchangeRecvRequest & request, size_t source_index, LocalRequestHandler & local_request_handler, bool is_fine_grained);

std::tuple<MPPTunnelPtr, grpc::Status> establishMPPConnectionLocalUnrefined(const ::mpp::EstablishMPPConnectionRequest * request, const std::shared_ptr<MPPTaskManager> & task_manager) const;

// Only for tiflash_compute mode, make sure disaggregated_dispatch_reqs is not empty.
void sendMPPTaskToTiFlashStorageNode(
LoggerPtr log,
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ void MPPTask::initExchangeReceivers()
context->getMaxStreams(),
log->identifier(),
executor_id,
executor.fine_grained_shuffle_stream_count());
executor.fine_grained_shuffle_stream_count(),
context->getSettings().enable_refined_local_tunnel);

if (status != RUNNING)
throw Exception("exchange receiver map can not be initialized, because the task is not in running state");

Expand Down
46 changes: 46 additions & 0 deletions dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,4 +392,50 @@ void SyncTunnelSender::startSendThread(PacketWriter * writer)
sendJob(writer);
});
}

// TODO remove it in the future
void MPPTunnel::connectUnrefinedLocal(PacketWriter * writer)
{
{
std::unique_lock lk(mu);
if (status != TunnelStatus::Unconnected)
throw Exception(fmt::format("MPPTunnel has connected or finished: {}", statusToString()));

LOG_TRACE(log, "ready to connect");

RUNTIME_ASSERT(writer == nullptr, log);
local_tunnel_sender_unrefined = std::make_shared<LocalTunnelSenderUnrefined>(queue_size, mem_tracker, log, tunnel_id, &data_size_in_queue);
tunnel_sender = local_tunnel_sender_unrefined;

status = TunnelStatus::Connected;
cv_for_status_changed.notify_all();
}
LOG_DEBUG(log, "connected");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add information about the local tunnel version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add information about the local tunnel version.

okk

}

std::shared_ptr<DB::TrackedMppDataPacket> LocalTunnelSenderUnrefined::readForLocal()
{
TrackedMppDataPacketPtr res;
auto result = send_queue.pop(res);
if (result == MPMCQueueResult::OK)
{
MPPTunnelMetric::subDataSizeMetric(*data_size_in_queue, res->getPacket().ByteSizeLong());

// switch tunnel's memory tracker into receiver's
res->switchMemTracker(current_memory_tracker);
return res;
}
else if (result == MPMCQueueResult::CANCELLED)
{
RUNTIME_ASSERT(!send_queue.getCancelReason().empty(), "Tunnel sender cancelled without reason");
if (!cancel_reason_sent)
{
cancel_reason_sent = true;
res = std::make_shared<TrackedMppDataPacket>(getPacketWithError(send_queue.getCancelReason()), current_memory_tracker);
return res;
}
}
consumerFinish("");
return nullptr;
}
} // namespace DB
Loading