Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.*: refine streaming writer and exchange writer #6186

Merged
merged 27 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ DAGBlockOutputStream::DAGBlockOutputStream(Block && header_, std::unique_ptr<DAG

void DAGBlockOutputStream::writePrefix()
{
//something to do here?
response_writer->prepare(header);
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
}

void DAGBlockOutputStream::write(const Block & block)
Expand All @@ -34,6 +34,7 @@ void DAGBlockOutputStream::write(const Block & block)
void DAGBlockOutputStream::writeSuffix()
{
// todo error handle
response_writer->flush();
response_writer->finishWrite();
}

Expand Down
12 changes: 1 addition & 11 deletions dbms/src/Flash/Coprocessor/StreamWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,7 @@ struct StreamWriter
explicit StreamWriter(::grpc::ServerWriter<::coprocessor::BatchResponse> * writer_)
: writer(writer_)
{}
void write(mpp::MPPDataPacket &)
{
throw Exception("StreamWriter::write(mpp::MPPDataPacket &) do not support writing MPPDataPacket!");
}
void write(mpp::MPPDataPacket &, [[maybe_unused]] uint16_t)
{
throw Exception("StreamWriter::write(mpp::MPPDataPacket &, [[maybe_unused]] uint16_t) do not support writing MPPDataPacket!");
}
void write(tipb::SelectResponse & response, [[maybe_unused]] uint16_t id = 0)
void write(tipb::SelectResponse & response)
{
::coprocessor::BatchResponse resp;
if (!response.SerializeToString(resp.mutable_data()))
Expand All @@ -59,8 +51,6 @@ struct StreamWriter
if (!writer->Write(resp))
throw Exception("Failed to write resp");
}
// a helper function
uint16_t getPartitionNum() { return 0; }
};

using StreamWriterPtr = std::shared_ptr<StreamWriter>;
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
36 changes: 15 additions & 21 deletions dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,24 @@ StreamingDAGResponseWriter<StreamWriterPtr>::StreamingDAGResponseWriter(
template <class StreamWriterPtr>
void StreamingDAGResponseWriter<StreamWriterPtr>::finishWrite()
{
assert(0 == rows_in_blocks);
if (should_send_exec_summary_at_last)
{
encodeThenWriteBlocks<true>();
}
else
{
encodeThenWriteBlocks<false>();
}
sendExecutionSummary();
}

template <class StreamWriterPtr>
void StreamingDAGResponseWriter<StreamWriterPtr>::sendExecutionSummary()
{
tipb::SelectResponse response;
summary_collector.addExecuteSummaries(response, /*delta_mode=*/true);
writer->write(response);
}

template <class StreamWriterPtr>
void StreamingDAGResponseWriter<StreamWriterPtr>::flush()
{
if (rows_in_blocks > 0)
encodeThenWriteBlocks<false>();
encodeThenWriteBlocks();
}

template <class StreamWriterPtr>
Expand All @@ -96,26 +99,17 @@ void StreamingDAGResponseWriter<StreamWriterPtr>::write(const Block & block)
}

if (static_cast<Int64>(rows_in_blocks) > batch_send_min_limit)
encodeThenWriteBlocks<false>();
encodeThenWriteBlocks();
}

template <class StreamWriterPtr>
template <bool send_exec_summary_at_last>
void StreamingDAGResponseWriter<StreamWriterPtr>::encodeThenWriteBlocks()
{
TrackedSelectResp response;
if constexpr (send_exec_summary_at_last)
summary_collector.addExecuteSummaries(response.getResponse(), /*delta_mode=*/true);
response.setEncodeType(dag_context.encode_type);
if (blocks.empty())
{
if constexpr (send_exec_summary_at_last)
{
writer->write(response.getResponse());
}
if (unlikely(blocks.empty()))
return;
}

TrackedSelectResp response;
response.setEncodeType(dag_context.encode_type);
if (dag_context.encode_type == tipb::EncodeType::TypeCHBlock)
{
/// passthrough data to a non-TiFlash node, like sending data to TiSpark
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ class StreamingDAGResponseWriter : public DAGResponseWriter
void finishWrite() override;

private:
template <bool send_exec_summary_at_last>
void encodeThenWriteBlocks();

void sendExecutionSummary();

private:
Int64 batch_send_min_limit;
bool should_send_exec_summary_at_last; /// only one stream needs to sending execution summaries at last.
StreamWriterPtr writer;
Expand Down
24 changes: 8 additions & 16 deletions dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,15 @@ class TestStreamingWriter : public testing::Test
std::unique_ptr<DAGContext> dag_context_ptr;
};

using MockStreamWriterChecker = std::function<void(tipb::SelectResponse &, uint16_t)>;
using MockStreamWriterChecker = std::function<void(tipb::SelectResponse &)>;

struct MockStreamWriter
{
explicit MockStreamWriter(MockStreamWriterChecker checker_)
: checker(checker_)
{}

void write(mpp::MPPDataPacket &) { FAIL() << "cannot reach here."; }
void write(mpp::MPPDataPacket &, uint16_t) { FAIL() << "cannot reach here."; }
void write(tipb::SelectResponse & response, uint16_t part_id) { checker(response, part_id); }
void write(tipb::SelectResponse & response) { checker(response, 0); }
uint16_t getPartitionNum() const { return 1; }
void write(tipb::SelectResponse & response) { checker(response); }

private:
MockStreamWriterChecker checker;
Expand All @@ -124,8 +120,6 @@ try
const size_t block_num = 64;
const size_t batch_send_min_limit = 108;

const bool should_send_exec_summary_at_last = true;

// 1. Build Blocks.
std::vector<Block> blocks;
for (size_t i = 0; i < block_num; ++i)
Expand All @@ -137,8 +131,7 @@ try

// 2. Build MockStreamWriter.
std::vector<tipb::SelectResponse> write_report;
auto checker = [&write_report](tipb::SelectResponse & response, uint16_t part_id) {
ASSERT_EQ(part_id, 0);
auto checker = [&write_report](tipb::SelectResponse & response) {
write_report.emplace_back(std::move(response));
};
auto mock_writer = std::make_shared<MockStreamWriter>(checker);
Expand All @@ -148,7 +141,7 @@ try
mock_writer,
batch_send_min_limit,
batch_send_min_limit,
should_send_exec_summary_at_last,
/*should_send_exec_summary_at_last=*/false,
*dag_context_ptr);
for (const auto & block : blocks)
dag_writer->write(block);
Expand Down Expand Up @@ -184,7 +177,7 @@ try
}
CATCH

TEST_F(TestStreamingWriter, emptyBlock)
TEST_F(TestStreamingWriter, testSendExecutionSummary)
try
{
std::vector<tipb::EncodeType> encode_types{
Expand All @@ -196,20 +189,19 @@ try
dag_context_ptr->encode_type = encode_type;

std::vector<tipb::SelectResponse> write_report;
auto checker = [&write_report](tipb::SelectResponse & response, uint16_t part_id) {
ASSERT_EQ(part_id, 0);
auto checker = [&write_report](tipb::SelectResponse & response) {
write_report.emplace_back(std::move(response));
};
auto mock_writer = std::make_shared<MockStreamWriter>(checker);

const size_t batch_send_min_limit = 5;
const bool should_send_exec_summary_at_last = true;
auto dag_writer = std::make_shared<StreamingDAGResponseWriter<std::shared_ptr<MockStreamWriter>>>(
mock_writer,
batch_send_min_limit,
batch_send_min_limit,
should_send_exec_summary_at_last,
/*should_send_exec_summary_at_last=*/true,
*dag_context_ptr);
dag_writer->flush();
dag_writer->finishWrite();

// For `should_send_exec_summary_at_last = true`, there is at least one packet used to pass execution summary.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,9 @@ struct MockWriter
return summary;
}

void write(mpp::MPPDataPacket &, uint16_t) { FAIL() << "cannot reach here."; }
void write(mpp::MPPDataPacket & packet)
void partitionWrite(const TrackedMppDataPacketPtr &, uint16_t) { FAIL() << "cannot reach here."; }
void broadcastOrPassThroughWrite(const TrackedMppDataPacketPtr & packet)
{
auto tracked_packet = std::make_shared<TrackedMppDataPacket>(packet, nullptr);
if (add_summary)
{
tipb::SelectResponse response;
Expand All @@ -73,14 +72,13 @@ struct MockWriter
summary_ptr->set_num_iterations(summary.num_iterations);
summary_ptr->set_concurrency(summary.concurrency);
summary_ptr->set_executor_id("Executor_0");
tracked_packet->serializeByResponse(response);
packet->serializeByResponse(response);
}
++total_packets;
if (!tracked_packet->packet.chunks().empty())
total_bytes += tracked_packet->packet.ByteSizeLong();
queue->push(tracked_packet);
if (!packet->packet.chunks().empty())
total_bytes += packet->packet.ByteSizeLong();
queue->push(packet);
}
void write(tipb::SelectResponse &, uint16_t) { FAIL() << "cannot reach here."; }
void write(tipb::SelectResponse & response)
{
++total_packets;
Expand All @@ -91,6 +89,7 @@ struct MockWriter
tracked_packet->serializeByResponse(response);
queue->push(tracked_packet);
}
void sendExecutionSummary(tipb::SelectResponse & response) { write(response); }
uint16_t getPartitionNum() const { return 1; }

PacketQueuePtr queue;
Expand Down Expand Up @@ -303,6 +302,7 @@ class TestTiRemoteBlockInputStream : public testing::Test
// 2. encode all blocks
for (const auto & block : source_blocks)
dag_writer->write(block);
dag_writer->flush();
writer->add_summary = true;
dag_writer->finishWrite();
}
Expand Down
65 changes: 28 additions & 37 deletions dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

namespace DB
{
template <class StreamWriterPtr>
BroadcastOrPassThroughWriter<StreamWriterPtr>::BroadcastOrPassThroughWriter(
StreamWriterPtr writer_,
template <class ExchangeWriterPtr>
BroadcastOrPassThroughWriter<ExchangeWriterPtr>::BroadcastOrPassThroughWriter(
ExchangeWriterPtr writer_,
Int64 batch_send_min_limit_,
bool should_send_exec_summary_at_last_,
DAGContext & dag_context_)
Expand All @@ -35,28 +35,31 @@ BroadcastOrPassThroughWriter<StreamWriterPtr>::BroadcastOrPassThroughWriter(
chunk_codec_stream = std::make_unique<CHBlockChunkCodec>()->newCodecStream(dag_context.result_field_types);
}

template <class StreamWriterPtr>
void BroadcastOrPassThroughWriter<StreamWriterPtr>::finishWrite()
template <class ExchangeWriterPtr>
void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::finishWrite()
{
assert(0 == rows_in_blocks);
if (should_send_exec_summary_at_last)
{
encodeThenWriteBlocks<true>();
}
else
{
encodeThenWriteBlocks<false>();
}
sendExecutionSummary();
}

template <class ExchangeWriterPtr>
void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::sendExecutionSummary()
{
tipb::SelectResponse response;
summary_collector.addExecuteSummaries(response, /*delta_mode=*/false);
writer->sendExecutionSummary(response);
}

template <class StreamWriterPtr>
void BroadcastOrPassThroughWriter<StreamWriterPtr>::flush()
template <class ExchangeWriterPtr>
void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::flush()
{
if (rows_in_blocks > 0)
encodeThenWriteBlocks<false>();
encodeThenWriteBlocks();
}

template <class StreamWriterPtr>
void BroadcastOrPassThroughWriter<StreamWriterPtr>::write(const Block & block)
template <class ExchangeWriterPtr>
void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::write(const Block & block)
{
RUNTIME_CHECK_MSG(
block.columns() == dag_context.result_field_types.size(),
Expand All @@ -69,39 +72,27 @@ void BroadcastOrPassThroughWriter<StreamWriterPtr>::write(const Block & block)
}

if (static_cast<Int64>(rows_in_blocks) > batch_send_min_limit)
encodeThenWriteBlocks<false>();
encodeThenWriteBlocks();
}

template <class StreamWriterPtr>
template <bool send_exec_summary_at_last>
void BroadcastOrPassThroughWriter<StreamWriterPtr>::encodeThenWriteBlocks()
template <class ExchangeWriterPtr>
void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::encodeThenWriteBlocks()
{
TrackedMppDataPacket tracked_packet(current_memory_tracker);
if constexpr (send_exec_summary_at_last)
{
TrackedSelectResp response;
summary_collector.addExecuteSummaries(response.getResponse(), /*delta_mode=*/false);
tracked_packet.serializeByResponse(response.getResponse());
}
if (blocks.empty())
{
if constexpr (send_exec_summary_at_last)
{
writer->write(tracked_packet.getPacket());
}
if (unlikely(blocks.empty()))
return;
}

auto tracked_packet = std::make_shared<TrackedMppDataPacket>();
while (!blocks.empty())
{
const auto & block = blocks.back();
chunk_codec_stream->encode(block, 0, block.rows());
blocks.pop_back();
tracked_packet.addChunk(chunk_codec_stream->getString());
tracked_packet->addChunk(chunk_codec_stream->getString());
chunk_codec_stream->clear();
}
assert(blocks.empty());
rows_in_blocks = 0;
writer->write(tracked_packet.getPacket());
writer->broadcastOrPassThroughWrite(tracked_packet);
}

template class BroadcastOrPassThroughWriter<MPPTunnelSetPtr>;
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@

namespace DB
{
template <class StreamWriterPtr>
template <class ExchangeWriterPtr>
class BroadcastOrPassThroughWriter : public DAGResponseWriter
{
public:
BroadcastOrPassThroughWriter(
StreamWriterPtr writer_,
ExchangeWriterPtr writer_,
Int64 batch_send_min_limit_,
bool should_send_exec_summary_at_last,
DAGContext & dag_context_);
Expand All @@ -36,12 +36,14 @@ class BroadcastOrPassThroughWriter : public DAGResponseWriter
void finishWrite() override;

private:
template <bool send_exec_summary_at_last>
void encodeThenWriteBlocks();

void sendExecutionSummary();

private:
Int64 batch_send_min_limit;
bool should_send_exec_summary_at_last;
StreamWriterPtr writer;
ExchangeWriterPtr writer;
std::vector<Block> blocks;
size_t rows_in_blocks;
std::unique_ptr<ChunkCodecStream> chunk_codec_stream;
Expand Down
Loading