Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline: use notify instead of polling for ExchangeReceiver #9073

Merged
merged 40 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
fa590d8
add notify_future for mpptunnel
SeaRise May 22, 2024
62d3add
add for writer
SeaRise May 22, 2024
e7d8c9b
for exchange sender
SeaRise May 22, 2024
2ffee76
remove writable
SeaRise May 22, 2024
f76b7bd
Merge branch 'master' into notify_for_sender
SeaRise May 22, 2024
563573b
try fmt
SeaRise May 22, 2024
e1e920c
refine ut
SeaRise May 22, 2024
2f258ae
for receiver
SeaRise May 22, 2024
f8e1dc6
u
SeaRise May 22, 2024
e2adf5e
fix
SeaRise May 22, 2024
c5655c6
format
Lloyd-Pottiger May 23, 2024
da81c3c
Update gtest_mpptunnel.cpp
SeaRise May 23, 2024
249cfa9
Merge branch 'master' into notify_for_sender
SeaRise May 23, 2024
2c12e6c
Merge branch 'master' into notify_for_receiver
SeaRise May 23, 2024
03dd7b6
fix comment
SeaRise May 27, 2024
d2814b8
Merge branch 'master' into notify_for_sender
SeaRise May 30, 2024
c0d784e
Merge branch 'master' into notify_for_receiver
SeaRise May 30, 2024
d9640df
Merge branch 'master' into notify_for_sender
SeaRise Jun 5, 2024
df23506
fix may hang issue
SeaRise Jun 5, 2024
0c43a11
some fix
SeaRise Jun 5, 2024
94d954b
fix ut
SeaRise Jun 5, 2024
dd1316f
refine err msg
SeaRise Jun 5, 2024
22ea753
Merge branch 'notify_for_sender' into notify_for_receiver
SeaRise Jun 5, 2024
902291b
add cancel
SeaRise Jun 5, 2024
4ddcf98
Merge branch 'master' into notify_for_sender
SeaRise Jun 6, 2024
95fdb7f
Merge branch 'master' into notify_for_receiver
SeaRise Jun 6, 2024
ee4ad65
Merge branch 'notify_for_sender' into notify_for_receiver
SeaRise Jun 6, 2024
7f878ce
u
SeaRise Jun 6, 2024
d4eac21
Merge branch 'master' into notify_for_receiver
SeaRise Aug 26, 2024
ec2838f
Merge branch 'master' into notify_for_receiver
SeaRise Sep 2, 2024
bd66511
Update ReceivedMessageQueue.cpp
SeaRise Sep 4, 2024
d13644d
Update DAGContext.h
SeaRise Sep 18, 2024
1f2423f
Update PipelineExecutorContext.cpp
SeaRise Sep 18, 2024
8da6615
Merge branch 'master' into notify_for_receiver
windtalker Sep 23, 2024
b38899f
Merge branch 'master' into notify_for_receiver
gengliqi Sep 27, 2024
c6033a8
update
gengliqi Sep 27, 2024
bb18d78
update
gengliqi Sep 27, 2024
c85801b
update name
gengliqi Sep 27, 2024
275134b
Merge branch 'master' into notify_for_receiver
gengliqi Sep 27, 2024
af512d3
fix
gengliqi Sep 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ class DAGContext
UInt64 getConnectionID() const { return connection_id; }
const String & getConnectionAlias() const { return connection_alias; }

MPPReceiverSetPtr getMPPReceiverSet() const { return mpp_receiver_set; }

public:
DAGRequest dag_request;
/// Some existing code inherited from Clickhouse assume that each query must have a valid query string and query ast,
Expand Down Expand Up @@ -443,6 +445,7 @@ class DAGContext
/// warning_count is the actual warning count during the entire execution
std::atomic<UInt64> warning_count;

// `mpp_receiver_set` is always set by `MPPTask` and is used later.
MPPReceiverSetPtr mpp_receiver_set;
std::vector<CoprocessorReaderPtr> coprocessor_readers;
/// vector of SubqueriesForSets(such as join build subquery).
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Flash/Executor/PipelineExecutorContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Executor/PipelineExecutorContext.h>
#include <Flash/Executor/ResultQueue.h>
#include <Flash/Mpp/MPPReceiverSet.h>
#include <Flash/Mpp/MPPTunnelSet.h>
#include <Flash/Mpp/Utils.h>
#include <Flash/Pipeline/Schedule/TaskScheduler.h>
Expand Down Expand Up @@ -178,9 +179,12 @@ void PipelineExecutorContext::cancel()
cancelSharedQueues();
if (likely(dag_context))
{
// Cancel the tunnel_set here to prevent pipeline tasks waiting in the WAIT_FOR_NOTIFY state from never being notified.
// Cancel the tunnel_set and mpp_receiver_set here to prevent
// pipeline tasks waiting in the WAIT_FOR_NOTIFY state from never being notified.
if (dag_context->tunnel_set)
dag_context->tunnel_set->close(getTrimmedErrMsg(), false);
if (auto mpp_receiver_set = dag_context->getMPPReceiverSet(); mpp_receiver_set)
mpp_receiver_set->cancel();
}
cancelResultQueueIfNeed();
if likely (TaskScheduler::instance && !query_id.empty())
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Flash/Mpp/LocalRequestHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ struct LocalRequestHandler

bool isWritable() const { return msg_queue->isWritable(); }

void registerPipeReadTask(TaskPtr && task) const { msg_queue->registerPipeReadTask(std::move(task)); }
void registerPipeWriteTask(TaskPtr && task) const { msg_queue->registerPipeWriteTask(std::move(task)); }

void writeDone(bool meet_error, const String & local_err_msg) const
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ WaitResult MPPTunnel::waitForWritable() const
RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id);
if (!tunnel_sender->isWritable())
{
setNotifyFuture(tunnel_sender);
setNotifyFuture(tunnel_sender.get());
return WaitResult::WaitForNotify;
}
return WaitResult::Ready;
Expand Down
36 changes: 25 additions & 11 deletions dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ ReceivedMessageQueue::ReceivedMessageQueue(
: std::function<void(const ReceivedMessagePtr &)>([this](const ReceivedMessagePtr & element) {
for (size_t i = 0; i < fine_grained_channel_size; ++i)
{
auto result = msg_channels_for_fine_grained_shuffle[i]->forcePush(element);
auto result = msg_channels_for_fine_grained_shuffle[i].forcePush(element);
RUNTIME_CHECK_MSG(result == MPMCQueueResult::OK, "push to fine grained channel must success");
}
}))
Expand All @@ -114,9 +114,7 @@ ReceivedMessageQueue::ReceivedMessageQueue(
assert(fine_grained_channel_size > 0);
msg_channels_for_fine_grained_shuffle.reserve(fine_grained_channel_size);
for (size_t i = 0; i < fine_grained_channel_size; ++i)
/// these are unbounded queues
msg_channels_for_fine_grained_shuffle.push_back(
std::make_shared<LooseBoundedMPMCQueue<ReceivedMessagePtr>>(std::numeric_limits<size_t>::max()));
msg_channels_for_fine_grained_shuffle.emplace_back();
}
}

Expand All @@ -127,9 +125,9 @@ MPMCQueueResult ReceivedMessageQueue::pop(size_t stream_id, ReceivedMessagePtr &
if (fine_grained_channel_size > 0)
{
if constexpr (need_wait)
res = msg_channels_for_fine_grained_shuffle[stream_id]->pop(recv_msg);
res = msg_channels_for_fine_grained_shuffle[stream_id].pop(recv_msg);
else
res = msg_channels_for_fine_grained_shuffle[stream_id]->tryPop(recv_msg);
res = msg_channels_for_fine_grained_shuffle[stream_id].tryPop(recv_msg);

if (res == MPMCQueueResult::OK)
{
Expand All @@ -151,6 +149,15 @@ MPMCQueueResult ReceivedMessageQueue::pop(size_t stream_id, ReceivedMessagePtr &
#else
grpc_recv_queue.tryDequeue();
#endif
ExchangeReceiverMetric::subDataSizeMetric(*data_size_in_queue, recv_msg->getPacket().ByteSizeLong());
}
}
else
{
if constexpr (!need_wait)
{
if (res == MPMCQueueResult::EMPTY)
setNotifyFuture(&msg_channels_for_fine_grained_shuffle[stream_id]);
}
}
}
Expand All @@ -160,13 +167,20 @@ MPMCQueueResult ReceivedMessageQueue::pop(size_t stream_id, ReceivedMessagePtr &
res = grpc_recv_queue.pop(recv_msg);
else
res = grpc_recv_queue.tryPop(recv_msg);
}

if (res == MPMCQueueResult::OK)
{
ExchangeReceiverMetric::subDataSizeMetric(*data_size_in_queue, recv_msg->getPacket().ByteSizeLong());
if (res == MPMCQueueResult::OK)
{
ExchangeReceiverMetric::subDataSizeMetric(*data_size_in_queue, recv_msg->getPacket().ByteSizeLong());
}
else
{
if constexpr (!need_wait)
{
if (res == MPMCQueueResult::EMPTY)
setNotifyFuture(&grpc_recv_queue);
}
}
}

return res;
}

Expand Down
70 changes: 65 additions & 5 deletions dbms/src/Flash/Mpp/ReceivedMessageQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
#include <Common/TiFlashMetrics.h>
#include <Flash/Mpp/ReceivedMessage.h>
#include <Flash/Mpp/TrackedMppDataPacket.h>
#include <Flash/Pipeline/Schedule/Tasks/NotifyFuture.h>

#include <memory>
#include <utility>

namespace DB
{
Expand Down Expand Up @@ -55,6 +57,65 @@ enum class ReceiverMode
Async
};

class GRPCNotifyQueue : public NotifyFuture
{
public:
template <typename... Args>
explicit GRPCNotifyQueue(const LoggerPtr & log_, Args &&... args)
: queue(log_, std::forward<Args>(args)...)
{}

void registerTask(TaskPtr && task) override { queue.registerPipeReadTask(std::move(task)); }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rename it to registerPipelineReadTask?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this is an override of NotifyFuture::registerTask, it cannot be renamed


MPMCQueueResult pop(ReceivedMessagePtr & data) { return queue.pop(data); }

MPMCQueueResult tryPop(ReceivedMessagePtr & data) { return queue.tryPop(data); }

MPMCQueueResult forcePush(ReceivedMessagePtr && data) { return queue.forcePush(std::move(data)); }

MPMCQueueResult push(ReceivedMessagePtr && data) { return queue.push(std::move(data)); }

MPMCQueueResult tryDequeue() { return queue.tryDequeue(); }

MPMCQueueResult pushWithTag(ReceivedMessagePtr && data, GRPCKickTag * new_tag)
{
return queue.pushWithTag(std::move(data), new_tag);
}

void setKickFuncForTest(GRPCKickFunc && func) { queue.setKickFuncForTest(std::move(func)); }

bool finish() { return queue.finish(); }
bool cancel() { return queue.cancel(); }

bool isWritable() const { return queue.isWritable(); }

void registerPipeWriteTask(TaskPtr && task) { queue.registerPipeWriteTask(std::move(task)); }

private:
GRPCRecvQueue<ReceivedMessagePtr> queue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about directly letting GRPCRecvNotifyQueue inherit GRPCRecvQueue<ReceivedMessagePtr>? If so, these functions do not need to be written, only the registerTask function is required.

};

class MSGChannel : public NotifyFuture
{
public:
void registerTask(TaskPtr && task) override { queue_ref.registerPipeReadTask(std::move(task)); }

MPMCQueueResult pop(ReceivedMessagePtr & data) { return queue_ref.pop(data); }

MPMCQueueResult tryPop(ReceivedMessagePtr & data) { return queue_ref.tryPop(data); }

MPMCQueueResult forcePush(const ReceivedMessagePtr & data) { return queue_ref.forcePush(data); }

bool finish() { return queue_ref.finish(); }
bool cancel() { return queue_ref.cancel(); }

private:
using QueueImpl = LooseBoundedMPMCQueue<ReceivedMessagePtr>;
// these are unbounded queues.
std::unique_ptr<QueueImpl> queue = std::make_unique<QueueImpl>(std::numeric_limits<size_t>::max());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use unique_ptr? Also, how about letting MSGChannel inherits LooseBoundedMPMCQueue<ReceivedMessagePtr>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ref #7963,
Because LooseBoundedMPMCQueue includes locks, it must use unique_ptr or shared_ptr somewhere.

QueueImpl & queue_ref = *queue;
};

class ReceivedMessageQueue
{
public:
Expand Down Expand Up @@ -86,20 +147,19 @@ class ReceivedMessageQueue
grpc_recv_queue.finish();
/// msg_channels_for_fine_grained_shuffle must be finished after msg_channel is finished
for (auto & channel : msg_channels_for_fine_grained_shuffle)
channel->finish();
channel.finish();
}

void cancel()
{
grpc_recv_queue.cancel();
/// msg_channels_for_fine_grained_shuffle must be cancelled after msg_channel is cancelled
for (auto & channel : msg_channels_for_fine_grained_shuffle)
channel->cancel();
channel.cancel();
}

bool isWritable() const { return grpc_recv_queue.isWritable(); }

void registerPipeReadTask(TaskPtr && task) { grpc_recv_queue.registerPipeReadTask(std::move(task)); }
void registerPipeWriteTask(TaskPtr && task) { grpc_recv_queue.registerPipeWriteTask(std::move(task)); }

#ifndef DBMS_PUBLIC_GTEST
Expand All @@ -118,8 +178,8 @@ class ReceivedMessageQueue
/// write: the writer first write the msg to msg_channel/grpc_recv_queue, if write success, then write msg to msg_channels_for_fine_grained_shuffle
/// read: the reader read msg from msg_channels_for_fine_grained_shuffle, and reduce the `remaining_consumers` in msg, if `remaining_consumers` is 0, then
/// remove the msg from msg_channel/grpc_recv_queue
std::vector<std::shared_ptr<LooseBoundedMPMCQueue<ReceivedMessagePtr>>> msg_channels_for_fine_grained_shuffle;
GRPCRecvQueue<ReceivedMessagePtr> grpc_recv_queue;
std::vector<MSGChannel> msg_channels_for_fine_grained_shuffle;
GRPCNotifyQueue grpc_recv_queue;
};

} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ ExecTaskStatus StreamRestoreTask::tryFlush()
t_block.clear();
return ExecTaskStatus::IO_IN;
case MPMCQueueResult::FULL:
setNotifyFuture(sink);
setNotifyFuture(sink.get());
return ExecTaskStatus::WAIT_FOR_NOTIFY;
case MPMCQueueResult::CANCELLED:
return ExecTaskStatus::CANCELLED;
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,23 @@

namespace DB
{
thread_local NotifyFuturePtr current_notify_future = nullptr;
thread_local NotifyFuture * current_notify_future = nullptr;

void setNotifyFuture(NotifyFuturePtr new_future)
void setNotifyFuture(NotifyFuture * new_future)
{
assert(current_notify_future == nullptr);
current_notify_future = std::move(new_future);
}

void clearNotifyFuture()
{
current_notify_future.reset();
current_notify_future = nullptr;
}

void registerTaskToFuture(TaskPtr && task)
{
assert(current_notify_future != nullptr);
current_notify_future->registerTask(std::move(task));
current_notify_future.reset();
current_notify_future = nullptr;
}
} // namespace DB
5 changes: 2 additions & 3 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ struct NotifyFuture
virtual ~NotifyFuture() = default;
virtual void registerTask(TaskPtr && task) = 0;
};
using NotifyFuturePtr = std::shared_ptr<NotifyFuture>;

extern thread_local NotifyFuturePtr current_notify_future;
extern thread_local NotifyFuture * current_notify_future;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this change may increase the risk of using a object that have been already released, is it by design that the current_notify_future will not be released during its lifecycle?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, current_notify_future is only released when the pipeline task changes from wait-for-notify to running


void setNotifyFuture(NotifyFuturePtr new_future);
void setNotifyFuture(NotifyFuture * new_future);
void clearNotifyFuture();
void registerTaskToFuture(TaskPtr && task);

Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ namespace DB
* CANCELLED/ERROR/FINISHED
* ▲
* │
* ───────────────────────────────┐
* │ ┌──►RUNNING◄──┐ │
* INIT───►│ │ │ │
* ▼ │
* │ WATITING◄────────►IO_IN/OUT │
* ───────────────────────────────┘
* ┌───────────────────────────────────────────────┐
* │ ┌──────────►RUNNING◄──────────┐ │
* │ │ │ │
* ▼ │
* │ WATITING/WAIT_FOR_NOTIFY◄────────►IO_IN/OUT │
* └───────────────────────────────────────────────┘
*/
enum class ExecTaskStatus
{
Expand Down
Loading