Skip to content

Commit

Permalink
Pipeline: support async exchange sender/receiver (#6854)
Browse files Browse the repository at this point in the history
ref #6518
  • Loading branch information
SeaRise authored Feb 28, 2023
1 parent 9ea9cf7 commit ce1b09b
Show file tree
Hide file tree
Showing 58 changed files with 1,564 additions and 411 deletions.
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
protected:
void readSuffixImpl() override
{
LOG_DEBUG(log, "finish read {} rows from remote", total_rows);
LOG_INFO(log, "finish read {} rows from remote", total_rows);
}

void appendInfo(FmtBuffer & buffer) const override
Expand Down
15 changes: 13 additions & 2 deletions dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Debug/MockExecutor/AstToPB.h>
#include <Debug/MockExecutor/AstToPBUtils.h>
#include <Debug/MockExecutor/ExchangeReceiverBinder.h>
#include <Debug/MockExecutor/ExchangeSenderBinder.h>
#include <Debug/MockExecutor/ExecutorBinder.h>
#include <Storages/Transaction/TiDB.h>
#include <kvproto/mpp.pb.h>
Expand All @@ -29,6 +30,9 @@ bool ExchangeReceiverBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int3
tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count);
tipb::ExchangeReceiver * exchange_receiver = tipb_executor->mutable_exchange_receiver();

if (exchange_sender)
exchange_receiver->set_tp(exchange_sender->getType());

for (auto & field : output_schema)
{
auto tipb_type = TiDB::columnInfoToFieldType(field.second);
Expand Down Expand Up @@ -61,9 +65,16 @@ bool ExchangeReceiverBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int3
}


ExecutorBinderPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema, uint64_t fine_grained_shuffle_stream_count)
void ExchangeReceiverBinder::toMPPSubPlan(size_t & executor_index, const DAGProperties & properties, std::unordered_map<String, std::pair<std::shared_ptr<ExchangeReceiverBinder>, std::shared_ptr<ExchangeSenderBinder>>> & exchange_map)
{
RUNTIME_CHECK_MSG(exchange_sender, "exchange_sender must not be nullptr in toMPPSubPlan");
exchange_sender->toMPPSubPlan(executor_index, properties, exchange_map);
exchange_map[name] = std::make_pair(shared_from_this(), exchange_sender);
}

ExecutorBinderPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema, uint64_t fine_grained_shuffle_stream_count, const std::shared_ptr<ExchangeSenderBinder> & exchange_sender)
{
ExecutorBinderPtr exchange_receiver = std::make_shared<mock::ExchangeReceiverBinder>(executor_index, schema, fine_grained_shuffle_stream_count);
ExecutorBinderPtr exchange_receiver = std::make_shared<mock::ExchangeReceiverBinder>(executor_index, schema, fine_grained_shuffle_stream_count, exchange_sender);
return exchange_receiver;
}
} // namespace DB::mock
13 changes: 11 additions & 2 deletions dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,30 @@
namespace DB::mock
{
class ExchangeReceiverBinder : public ExecutorBinder
, public std::enable_shared_from_this<ExchangeReceiverBinder>
{
public:
ExchangeReceiverBinder(size_t & index, const DAGSchema & output, uint64_t fine_grained_shuffle_stream_count_ = 0)
ExchangeReceiverBinder(
size_t & index,
const DAGSchema & output,
uint64_t fine_grained_shuffle_stream_count_ = 0,
const std::shared_ptr<ExchangeSenderBinder> & exchange_sender_ = nullptr)
: ExecutorBinder(index, "exchange_receiver_" + std::to_string(index), output)
, fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_)
, exchange_sender(exchange_sender_)
{}

bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context &) override;

void columnPrune(std::unordered_set<String> &) override {}

void toMPPSubPlan(size_t & executor_index, const DAGProperties &, std::unordered_map<String, std::pair<std::shared_ptr<ExchangeReceiverBinder>, std::shared_ptr<ExchangeSenderBinder>>> & exchange_map) override;

private:
TaskMetas task_metas;
uint64_t fine_grained_shuffle_stream_count;
std::shared_ptr<ExchangeSenderBinder> exchange_sender;
};

ExecutorBinderPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema, uint64_t fine_grained_shuffle_stream_count);
ExecutorBinderPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema, uint64_t fine_grained_shuffle_stream_count, const std::shared_ptr<ExchangeSenderBinder> & exchange_sender);
} // namespace DB::mock
5 changes: 2 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,6 @@ void DAGQueryBlockInterpreter::executeExpand(DAGPipeline & pipeline, const Expre

void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline)
{
RUNTIME_ASSERT(dagContext().isMPPTask() && dagContext().tunnel_set != nullptr, log, "exchange_sender only run in MPP");
/// exchange sender should be at the top of operators
const auto & exchange_sender = query_block.exchange_sender->exchange_sender();
std::vector<Int64> partition_col_ids = ExchangeSenderInterpreterHelper::genPartitionColIds(exchange_sender);
Expand All @@ -776,7 +775,6 @@ void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline)
pipeline.transform([&](auto & stream) {
// construct writer
std::unique_ptr<DAGResponseWriter> response_writer = newMPPExchangeWriter(
dagContext().tunnel_set,
partition_col_ids,
partition_col_collators,
exchange_sender.tp(),
Expand All @@ -787,7 +785,8 @@ void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline)
stream_count,
batch_size,
exchange_sender.compression(),
context.getSettingsRef().batch_send_min_limit_compression);
context.getSettingsRef().batch_send_min_limit_compression,
log->identifier());
stream = std::make_shared<ExchangeSenderBlockInputStream>(stream, std::move(response_writer), log->identifier());
stream->setExtraInfo(extra_info);
});
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ class DAGResponseWriter
/// prepared with sample block
virtual void prepare(const Block &){};
virtual void write(const Block & block) = 0;

// For async writer, `isReadyForWrite` need to be called before calling `write`.
// ```
// while (!isReadyForWrite()) {}
// write(block);
// ```
virtual bool isReadyForWrite() const { throw Exception("Unsupport"); }

/// flush cached blocks for batch writer
virtual void flush() = 0;
virtual ~DAGResponseWriter() = default;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/StreamWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ struct StreamWriter
if (!writer->Write(resp))
throw Exception("Failed to write resp");
}
bool isReadyForWrite() const { throw Exception("Unsupport async write"); }
};

using StreamWriterPtr = std::shared_ptr<StreamWriter>;
Expand Down
11 changes: 9 additions & 2 deletions dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include <Flash/Coprocessor/DefaultChunkCodec.h>
#include <Flash/Coprocessor/StreamWriter.h>
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
#include <Flash/Mpp/MPPTunnelSet.h>
#include <Flash/Mpp/MPPTunnelSetWriter.h>

namespace DB
{
Expand Down Expand Up @@ -68,6 +68,12 @@ void StreamingDAGResponseWriter<StreamWriterPtr>::flush()
encodeThenWriteBlocks();
}

template <class StreamWriterPtr>
bool StreamingDAGResponseWriter<StreamWriterPtr>::isReadyForWrite() const
{
return writer->isReadyForWrite();
}

template <class StreamWriterPtr>
void StreamingDAGResponseWriter<StreamWriterPtr>::write(const Block & block)
{
Expand Down Expand Up @@ -141,5 +147,6 @@ void StreamingDAGResponseWriter<StreamWriterPtr>::encodeThenWriteBlocks()
}

template class StreamingDAGResponseWriter<StreamWriterPtr>;
template class StreamingDAGResponseWriter<MPPTunnelSetPtr>;
template class StreamingDAGResponseWriter<SyncMPPTunnelSetWriterPtr>;
template class StreamingDAGResponseWriter<AsyncMPPTunnelSetWriterPtr>;
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class StreamingDAGResponseWriter : public DAGResponseWriter
Int64 batch_send_min_limit_,
DAGContext & dag_context_);
void write(const Block & block) override;
bool isReadyForWrite() const override;
void flush() override;

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ struct MockStreamWriter
{}

void write(tipb::SelectResponse & response) { checker(response); }
bool isReadyForWrite() const { throw Exception("Unsupport async write"); }

private:
MockStreamWriterChecker checker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,12 @@ struct MockWriter
tracked_packet->serializeByResponse(response);
queue->push(tracked_packet);
}
void sendExecutionSummary(const tipb::SelectResponse & response)
{
tipb::SelectResponse tmp = response;
write(tmp);
}
uint16_t getPartitionNum() const { return 1; }
bool isLocal(size_t index) const
{
return index == 0;
}
bool isReadyForWrite() const { throw Exception("Unsupport async write"); }

std::vector<tipb::FieldType> result_field_types;

Expand Down Expand Up @@ -371,7 +367,8 @@ class TestTiRemoteBlockInputStream : public testing::Test
// 3. send execution summary
writer->add_summary = true;
ExecutionSummaryCollector summary_collector(*dag_context_ptr);
writer->sendExecutionSummary(summary_collector.genExecutionSummaryResponse());
auto summary_response = summary_collector.genExecutionSummaryResponse();
writer->write(summary_response);
}

void prepareQueueV2(
Expand Down
12 changes: 9 additions & 3 deletions dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Mpp/BroadcastOrPassThroughWriter.h>
#include <Flash/Mpp/MPPTunnelSet.h>
#include <Flash/Mpp/MPPTunnelSetWriter.h>

namespace DB
{
Expand All @@ -40,6 +40,12 @@ void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::flush()
writeBlocks();
}

template <class ExchangeWriterPtr>
bool BroadcastOrPassThroughWriter<ExchangeWriterPtr>::isReadyForWrite() const
{
return writer->isReadyForWrite();
}

template <class ExchangeWriterPtr>
void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::write(const Block & block)
{
Expand Down Expand Up @@ -68,6 +74,6 @@ void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::writeBlocks()
rows_in_blocks = 0;
}

template class BroadcastOrPassThroughWriter<MPPTunnelSetPtr>;

template class BroadcastOrPassThroughWriter<SyncMPPTunnelSetWriterPtr>;
template class BroadcastOrPassThroughWriter<AsyncMPPTunnelSetWriterPtr>;
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class BroadcastOrPassThroughWriter : public DAGResponseWriter
Int64 batch_send_min_limit_,
DAGContext & dag_context_);
void write(const Block & block) override;
bool isReadyForWrite() const override;
void flush() override;

private:
Expand Down
89 changes: 75 additions & 14 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -707,34 +707,95 @@ DecodeDetail ExchangeReceiverBase<RPCContext>::decodeChunks(
}

template <typename RPCContext>
ExchangeReceiverResult ExchangeReceiverBase<RPCContext>::nextResult(
std::queue<Block> & block_queue,
const Block & header,
ReceiveResult ExchangeReceiverBase<RPCContext>::receive(size_t stream_id)
{
return receive(
stream_id,
[&](size_t stream_id, std::shared_ptr<ReceivedMessage> & recv_msg) {
return msg_channels[stream_id]->pop(recv_msg);
});
}

template <typename RPCContext>
ReceiveResult ExchangeReceiverBase<RPCContext>::nonBlockingReceive(size_t stream_id)
{
return receive(
stream_id,
[&](size_t stream_id, std::shared_ptr<ReceivedMessage> & recv_msg) {
return msg_channels[stream_id]->tryPop(recv_msg);
});
}

template <typename RPCContext>
ReceiveResult ExchangeReceiverBase<RPCContext>::receive(
size_t stream_id,
std::unique_ptr<CHBlockChunkDecodeAndSquash> & decoder_ptr)
std::function<MPMCQueueResult(size_t, std::shared_ptr<ReceivedMessage> &)> recv_func)
{
if (unlikely(stream_id >= msg_channels.size()))
{
LOG_ERROR(exc_log, "stream_id out of range, stream_id: {}, total_stream_count: {}", stream_id, msg_channels.size());
return ExchangeReceiverResult::newError(0, "", "stream_id out of range");
auto err_msg = fmt::format("stream_id out of range, stream_id: {}, total_channel_count: {}", stream_id, msg_channels.size());
LOG_ERROR(exc_log, err_msg);
throw Exception(err_msg);
}

std::shared_ptr<ReceivedMessage> recv_msg;
if (msg_channels[stream_id]->pop(recv_msg) != MPMCQueueResult::OK)
switch (recv_func(stream_id, recv_msg))
{
return handleUnnormalChannel(block_queue, decoder_ptr);
case MPMCQueueResult::OK:
assert(recv_msg);
return {ReceiveStatus::ok, std::move(recv_msg)};
case MPMCQueueResult::EMPTY:
return {ReceiveStatus::empty, nullptr};
default:
return {ReceiveStatus::eof, nullptr};
}
else
}

template <typename RPCContext>
ExchangeReceiverResult ExchangeReceiverBase<RPCContext>::toExchangeReceiveResult(
ReceiveResult & recv_result,
std::queue<Block> & block_queue,
const Block & header,
std::unique_ptr<CHBlockChunkDecodeAndSquash> & decoder_ptr)
{
switch (recv_result.recv_status)
{
case ReceiveStatus::ok:
{
assert(recv_msg != nullptr);
if (unlikely(recv_msg->error_ptr != nullptr))
return ExchangeReceiverResult::newError(recv_msg->source_index, recv_msg->req_info, recv_msg->error_ptr->msg());
assert(recv_result.recv_msg != nullptr);
if (unlikely(recv_result.recv_msg->error_ptr != nullptr))
return ExchangeReceiverResult::newError(
recv_result.recv_msg->source_index,
recv_result.recv_msg->req_info,
recv_result.recv_msg->error_ptr->msg());

ExchangeReceiverMetric::subDataSizeMetric(data_size_in_queue, recv_msg->packet->getPacket().ByteSizeLong());
return toDecodeResult(block_queue, header, recv_msg, decoder_ptr);
ExchangeReceiverMetric::subDataSizeMetric(
data_size_in_queue,
recv_result.recv_msg->packet->getPacket().ByteSizeLong());
return toDecodeResult(block_queue, header, recv_result.recv_msg, decoder_ptr);
}
case ReceiveStatus::eof:
return handleUnnormalChannel(block_queue, decoder_ptr);
case ReceiveStatus::empty:
throw Exception("Unexpected recv status: empty");
}
}

template <typename RPCContext>
ExchangeReceiverResult ExchangeReceiverBase<RPCContext>::nextResult(
std::queue<Block> & block_queue,
const Block & header,
size_t stream_id,
std::unique_ptr<CHBlockChunkDecodeAndSquash> & decoder_ptr)
{
auto recv_res = receive(stream_id);
return toExchangeReceiveResult(
recv_res,
block_queue,
header,
decoder_ptr);
}

template <typename RPCContext>
ExchangeReceiverResult ExchangeReceiverBase<RPCContext>::handleUnnormalChannel(
std::queue<Block> & block_queue,
Expand Down
26 changes: 26 additions & 0 deletions dbms/src/Flash/Mpp/ExchangeReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,19 @@ enum class ExchangeReceiverState
CLOSED,
};

enum class ReceiveStatus
{
empty,
ok,
eof,
};

struct ReceiveResult
{
ReceiveStatus recv_status;
std::shared_ptr<ReceivedMessage> recv_msg;
};

template <typename RPCContext>
class ExchangeReceiverBase
{
Expand All @@ -105,6 +118,15 @@ class ExchangeReceiverBase
void cancel();
void close();

ReceiveResult receive(size_t stream_id);
ReceiveResult nonBlockingReceive(size_t stream_id);

ExchangeReceiverResult toExchangeReceiveResult(
ReceiveResult & recv_result,
std::queue<Block> & block_queue,
const Block & header,
std::unique_ptr<CHBlockChunkDecodeAndSquash> & decoder_ptr);

ExchangeReceiverResult nextResult(
std::queue<Block> & block_queue,
const Block & header,
Expand Down Expand Up @@ -159,6 +181,10 @@ class ExchangeReceiverBase
const std::shared_ptr<ReceivedMessage> & recv_msg,
std::unique_ptr<CHBlockChunkDecodeAndSquash> & decoder_ptr);

ReceiveResult receive(
size_t stream_id,
std::function<MPMCQueueResult(size_t, std::shared_ptr<ReceivedMessage> &)> recv_func);

private:
void prepareMsgChannels();
void addLocalConnectionNum();
Expand Down
Loading

0 comments on commit ce1b09b

Please sign in to comment.