Skip to content

Commit

Permalink
Split sending execution summary from Streaming/ExchangeWriter (#6335)
Browse files Browse the repository at this point in the history
ref #5900
  • Loading branch information
SeaRise authored Nov 22, 2022
1 parent 7203151 commit f121236
Show file tree
Hide file tree
Showing 25 changed files with 69 additions and 232 deletions.
6 changes: 3 additions & 3 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/ChunkDecodeAndSquash.h>
#include <Flash/Coprocessor/CoprocessorReader.h>
#include <Flash/Coprocessor/DAGResponseWriter.h>
#include <Flash/Coprocessor/ExecutionSummary.h>
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Statistics/ConnectionProfileInfo.h>
Expand Down Expand Up @@ -51,8 +51,8 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream

/// this atomic variable is kind of a lock for the struct of execution_summaries:
/// if execution_summaries_inited[index] = true, the map execution_summaries[index]
/// itself will not be modified, so DAGResponseWriter can read it safely, otherwise,
/// DAGResponseWriter will just skip execution_summaries[index]
/// itself will not be modified, so ExecutionSummaryCollector can read it safely, otherwise,
/// ExecutionSummaryCollector will just skip execution_summaries[index]
std::vector<std::atomic<bool>> execution_summaries_inited;
std::vector<std::unordered_map<String, ExecutionSummary>> execution_summaries;

Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <DataStreams/copyData.h>
#include <Flash/Coprocessor/DAGBlockOutputStream.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/ExecutionSummaryCollector.h>
#include <Flash/Coprocessor/StreamWriter.h>
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
#include <Flash/Coprocessor/UnaryDAGResponseWriter.h>
Expand Down Expand Up @@ -132,10 +133,15 @@ try
streaming_writer,
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
dag_context.collect_execution_summaries,
dag_context);
dag_output_stream = std::make_shared<DAGBlockOutputStream>(streams.in->getHeader(), std::move(response_writer));
copyData(*streams.in, *dag_output_stream);
if (dag_context.collect_execution_summaries)
{
ExecutionSummaryCollector summary_collector(dag_context);
auto execution_summary_response = summary_collector.genExecutionSummaryResponse();
streaming_writer->write(execution_summary_response);
}
}

if (auto throughput = dag_context.getTableScanThroughput(); throughput.first)
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,6 @@ void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline)
const auto & exchange_sender = query_block.exchange_sender->exchange_sender();
std::vector<Int64> partition_col_ids = ExchangeSenderInterpreterHelper::genPartitionColIds(exchange_sender);
TiDB::TiDBCollators partition_col_collators = ExchangeSenderInterpreterHelper::genPartitionColCollators(exchange_sender);
int stream_id = 0;
const uint64_t stream_count = query_block.exchange_sender->fine_grained_shuffle_stream_count();
const uint64_t batch_size = query_block.exchange_sender->fine_grained_shuffle_batch_size();

Expand All @@ -779,7 +778,6 @@ void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline)
exchange_sender.tp(),
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response
dagContext(),
enable_fine_grained_shuffle,
stream_count,
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ DAGResponseWriter::DAGResponseWriter(
Int64 records_per_chunk_,
DAGContext & dag_context_)
: records_per_chunk(records_per_chunk_)
, summary_collector(dag_context_)
, dag_context(dag_context_)
{
if (dag_context.encode_type == tipb::EncodeType::TypeCHBlock)
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Flash/Coprocessor/DAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

#pragma once

#include <Flash/Coprocessor/ExecutionSummaryCollector.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <tipb/select.pb.h>

namespace DB
Expand All @@ -36,7 +36,6 @@ class DAGResponseWriter

protected:
Int64 records_per_chunk;
ExecutionSummaryCollector summary_collector;
DAGContext & dag_context;
};

Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ void mergeRemoteExecuteSummaries(
}
}

tipb::SelectResponse ExecutionSummaryCollector::genExecutionSummaryResponse()
{
tipb::SelectResponse response;
addExecuteSummaries(response);
return response;
}

void ExecutionSummaryCollector::addExecuteSummaries(tipb::SelectResponse & response)
{
if (!dag_context.collect_execution_summaries)
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class ExecutionSummaryCollector

void addExecuteSummaries(tipb::SelectResponse & response);

tipb::SelectResponse genExecutionSummaryResponse();

private:
void fillTiExecutionSummary(
tipb::ExecutorExecutionSummary * execution_summary,
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/RemoteRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ RemoteRequest RemoteRequest::build(
dag_req.set_force_encode_type(true);
}
/// do not collect execution summaries because in this case because the execution summaries
/// will be collected by CoprocessorBlockInputStream
/// will be collected by CoprocessorBlockInputStream.
/// Otherwise rows in execution summary of table scan will be double.
dag_req.set_collect_execution_summaries(false);
const auto & original_dag_req = *dag_context.dag_request;
if (original_dag_req.has_time_zone_name() && !original_dag_req.time_zone_name().empty())
Expand Down
12 changes: 0 additions & 12 deletions dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ StreamingDAGResponseWriter<StreamWriterPtr>::StreamingDAGResponseWriter(
StreamWriterPtr writer_,
Int64 records_per_chunk_,
Int64 batch_send_min_limit_,
bool should_send_exec_summary_at_last_,
DAGContext & dag_context_)
: DAGResponseWriter(records_per_chunk_, dag_context_)
, batch_send_min_limit(batch_send_min_limit_)
, should_send_exec_summary_at_last(should_send_exec_summary_at_last_)
, writer(writer_)
{
rows_in_blocks = 0;
Expand Down Expand Up @@ -66,16 +64,6 @@ template <class StreamWriterPtr>
void StreamingDAGResponseWriter<StreamWriterPtr>::finishWrite()
{
assert(0 == rows_in_blocks);
if (should_send_exec_summary_at_last)
sendExecutionSummary();
}

template <class StreamWriterPtr>
void StreamingDAGResponseWriter<StreamWriterPtr>::sendExecutionSummary()
{
tipb::SelectResponse response;
summary_collector.addExecuteSummaries(response);
writer->write(response);
}

template <class StreamWriterPtr>
Expand Down
4 changes: 0 additions & 4 deletions dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class StreamingDAGResponseWriter : public DAGResponseWriter
StreamWriterPtr writer_,
Int64 records_per_chunk_,
Int64 batch_send_min_limit_,
bool should_send_exec_summary_at_last,
DAGContext & dag_context_);
void write(const Block & block) override;
void flush() override;
Expand All @@ -43,11 +42,8 @@ class StreamingDAGResponseWriter : public DAGResponseWriter
private:
void encodeThenWriteBlocks();

void sendExecutionSummary();

private:
Int64 batch_send_min_limit;
bool should_send_exec_summary_at_last; /// only one stream needs to sending execution summaries at last.
StreamWriterPtr writer;
std::vector<Block> blocks;
size_t rows_in_blocks;
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Flash/Coprocessor/ArrowChunkCodec.h>
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/DefaultChunkCodec.h>
#include <Flash/Coprocessor/ExecutionSummaryCollector.h>
#include <Flash/Coprocessor/UnaryDAGResponseWriter.h>

namespace DB
Expand Down Expand Up @@ -77,7 +78,12 @@ void UnaryDAGResponseWriter::finishWrite()
encodeChunkToDAGResponse();
}
appendWarningsToDAGResponse();
summary_collector.addExecuteSummaries(*dag_response);

if (dag_context.collect_execution_summaries)
{
ExecutionSummaryCollector summary_collector(dag_context);
summary_collector.addExecuteSummaries(*dag_response);
}
}

void UnaryDAGResponseWriter::write(const Block & block)
Expand Down
35 changes: 0 additions & 35 deletions dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ try
mock_writer,
batch_send_min_limit,
batch_send_min_limit,
/*should_send_exec_summary_at_last=*/false,
*dag_context_ptr);
for (const auto & block : blocks)
dag_writer->write(block);
Expand Down Expand Up @@ -177,39 +176,5 @@ try
}
CATCH

TEST_F(TestStreamingWriter, testSendExecutionSummary)
try
{
std::vector<tipb::EncodeType> encode_types{
tipb::EncodeType::TypeDefault,
tipb::EncodeType::TypeChunk,
tipb::EncodeType::TypeCHBlock};
for (const auto & encode_type : encode_types)
{
dag_context_ptr->encode_type = encode_type;

std::vector<tipb::SelectResponse> write_report;
auto checker = [&write_report](tipb::SelectResponse & response) {
write_report.emplace_back(std::move(response));
};
auto mock_writer = std::make_shared<MockStreamWriter>(checker);

const size_t batch_send_min_limit = 5;
auto dag_writer = std::make_shared<StreamingDAGResponseWriter<std::shared_ptr<MockStreamWriter>>>(
mock_writer,
batch_send_min_limit,
batch_send_min_limit,
/*should_send_exec_summary_at_last=*/true,
*dag_context_ptr);
dag_writer->flush();
dag_writer->finishWrite();

// For `should_send_exec_summary_at_last = true`, there is at least one packet used to pass execution summary.
ASSERT_EQ(write_report.size(), 1);
ASSERT_EQ(write_report.back().chunks_size(), 0);
}
}
CATCH

} // namespace tests
} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <DataStreams/TiRemoteBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h>
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/ExecutionSummaryCollector.h>
#include <Interpreters/Context.h>
#include <Storages/Transaction/TiDB.h>
#include <TestUtils/ColumnGenerator.h>
Expand Down Expand Up @@ -87,7 +88,11 @@ struct MockWriter
tracked_packet->serializeByResponse(response);
queue->push(tracked_packet);
}
void sendExecutionSummary(tipb::SelectResponse & response) { write(response); }
void sendExecutionSummary(const tipb::SelectResponse & response)
{
tipb::SelectResponse tmp = response;
write(tmp);
}
uint16_t getPartitionNum() const { return 1; }

PacketQueuePtr queue;
Expand Down Expand Up @@ -295,15 +300,18 @@ class TestTiRemoteBlockInputStream : public testing::Test
auto dag_writer = std::make_shared<BroadcastOrPassThroughWriter<MockWriterPtr>>(
writer,
batch_send_min_limit,
/*should_send_exec_summary_at_last=*/true,
*dag_context_ptr);

// 2. encode all blocks
for (const auto & block : source_blocks)
dag_writer->write(block);
dag_writer->flush();
writer->add_summary = true;
dag_writer->finishWrite();

// 3. send execution summary
writer->add_summary = true;
ExecutionSummaryCollector summary_collector(*dag_context_ptr);
writer->sendExecutionSummary(summary_collector.genExecutionSummaryResponse());
}

void prepareQueueV2(
Expand All @@ -319,15 +327,19 @@ class TestTiRemoteBlockInputStream : public testing::Test
writer,
0,
batch_send_min_limit,
/*should_send_exec_summary_at_last=*/true,
*dag_context_ptr);

// 2. encode all blocks
for (const auto & block : source_blocks)
dag_writer->write(block);
dag_writer->flush();
writer->add_summary = true;
dag_writer->finishWrite();

// 3. send execution summary
writer->add_summary = true;
ExecutionSummaryCollector summary_collector(*dag_context_ptr);
auto execution_summary_response = summary_collector.genExecutionSummaryResponse();
writer->write(execution_summary_response);
}

void checkChunkInResponse(
Expand Down
12 changes: 0 additions & 12 deletions dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ template <class ExchangeWriterPtr>
BroadcastOrPassThroughWriter<ExchangeWriterPtr>::BroadcastOrPassThroughWriter(
ExchangeWriterPtr writer_,
Int64 batch_send_min_limit_,
bool should_send_exec_summary_at_last_,
DAGContext & dag_context_)
: DAGResponseWriter(/*records_per_chunk=*/-1, dag_context_)
, batch_send_min_limit(batch_send_min_limit_)
, should_send_exec_summary_at_last(should_send_exec_summary_at_last_)
, writer(writer_)
{
rows_in_blocks = 0;
Expand All @@ -39,16 +37,6 @@ template <class ExchangeWriterPtr>
void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::finishWrite()
{
assert(0 == rows_in_blocks);
if (should_send_exec_summary_at_last)
sendExecutionSummary();
}

template <class ExchangeWriterPtr>
void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::sendExecutionSummary()
{
tipb::SelectResponse response;
summary_collector.addExecuteSummaries(response);
writer->sendExecutionSummary(response);
}

template <class ExchangeWriterPtr>
Expand Down
4 changes: 0 additions & 4 deletions dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class BroadcastOrPassThroughWriter : public DAGResponseWriter
BroadcastOrPassThroughWriter(
ExchangeWriterPtr writer_,
Int64 batch_send_min_limit_,
bool should_send_exec_summary_at_last,
DAGContext & dag_context_);
void write(const Block & block) override;
void flush() override;
Expand All @@ -38,11 +37,8 @@ class BroadcastOrPassThroughWriter : public DAGResponseWriter
private:
void encodeThenWriteBlocks();

void sendExecutionSummary();

private:
Int64 batch_send_min_limit;
bool should_send_exec_summary_at_last;
ExchangeWriterPtr writer;
std::vector<Block> blocks;
size_t rows_in_blocks;
Expand Down
12 changes: 0 additions & 12 deletions dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@ FineGrainedShuffleWriter<ExchangeWriterPtr>::FineGrainedShuffleWriter(
ExchangeWriterPtr writer_,
std::vector<Int64> partition_col_ids_,
TiDB::TiDBCollators collators_,
bool should_send_exec_summary_at_last_,
DAGContext & dag_context_,
uint64_t fine_grained_shuffle_stream_count_,
UInt64 fine_grained_shuffle_batch_size_)
: DAGResponseWriter(/*records_per_chunk=*/-1, dag_context_)
, should_send_exec_summary_at_last(should_send_exec_summary_at_last_)
, writer(writer_)
, partition_col_ids(std::move(partition_col_ids_))
, collators(std::move(collators_))
Expand All @@ -50,16 +48,6 @@ template <class ExchangeWriterPtr>
void FineGrainedShuffleWriter<ExchangeWriterPtr>::finishWrite()
{
assert(0 == rows_in_blocks);
if (should_send_exec_summary_at_last)
sendExecutionSummary();
}

template <class ExchangeWriterPtr>
void FineGrainedShuffleWriter<ExchangeWriterPtr>::sendExecutionSummary()
{
tipb::SelectResponse response;
summary_collector.addExecuteSummaries(response);
writer->sendExecutionSummary(response);
}

template <class ExchangeWriterPtr>
Expand Down
4 changes: 0 additions & 4 deletions dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class FineGrainedShuffleWriter : public DAGResponseWriter
ExchangeWriterPtr writer_,
std::vector<Int64> partition_col_ids_,
TiDB::TiDBCollators collators_,
bool should_send_exec_summary_at_last,
DAGContext & dag_context_,
UInt64 fine_grained_shuffle_stream_count_,
UInt64 fine_grained_shuffle_batch_size);
Expand All @@ -46,10 +45,7 @@ class FineGrainedShuffleWriter : public DAGResponseWriter

void initScatterColumns();

void sendExecutionSummary();

private:
bool should_send_exec_summary_at_last;
ExchangeWriterPtr writer;
std::vector<Block> blocks;
std::vector<Int64> partition_col_ids;
Expand Down
Loading

0 comments on commit f121236

Please sign in to comment.