Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix execute details regression after merge master #678

Merged
merged 4 commits into from
May 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 11 additions & 227 deletions dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp
Original file line number Diff line number Diff line change
@@ -1,243 +1,27 @@
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Flash/Coprocessor/ArrowChunkCodec.h>
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/DAGBlockOutputStream.h>
#include <Flash/Coprocessor/DefaultChunkCodec.h>

namespace DB
{

namespace ErrorCodes
{
extern const int UNSUPPORTED_PARAMETER;
extern const int LOGICAL_ERROR;
} // namespace ErrorCodes

template <>
DAGBlockOutputStream<false>::DAGBlockOutputStream(tipb::SelectResponse * dag_response_, Int64 records_per_chunk_,
tipb::EncodeType encode_type_, std::vector<tipb::FieldType> result_field_types_, Block && header_, DAGContext & dag_context_,
bool collect_execute_summary_)
: dag_response(dag_response_),
result_field_types(std::move(result_field_types_)),
header(std::move(header_)),
records_per_chunk(records_per_chunk_),
encode_type(encode_type_),
current_records_num(0),
dag_context(dag_context_),
collect_execute_summary(collect_execute_summary_)
{
previous_execute_stats.resize(dag_context.profile_streams_map.size(), std::make_tuple(0, 0, 0));
if (encode_type == tipb::EncodeType::TypeDefault)
{
chunk_codec_stream = std::make_unique<DefaultChunkCodec>()->newCodecStream(result_field_types);
}
else if (encode_type == tipb::EncodeType::TypeChunk)
{
chunk_codec_stream = std::make_unique<ArrowChunkCodec>()->newCodecStream(result_field_types);
}
else if (encode_type == tipb::EncodeType::TypeCHBlock)
{
chunk_codec_stream = std::make_unique<CHBlockChunkCodec>()->newCodecStream(result_field_types);
records_per_chunk = -1;
}
else
{
throw Exception("Only Default and Arrow encode type is supported in DAGBlockOutputStream.", ErrorCodes::UNSUPPORTED_PARAMETER);
}
dag_response->set_encode_type(encode_type);
}
DAGBlockOutputStream::DAGBlockOutputStream(tipb::SelectResponse * dag_response, Int64 records_per_chunk, tipb::EncodeType encode_type,
std::vector<tipb::FieldType> result_field_types, Block && header_, DAGContext & dag_context, bool collect_execute_summary,
bool return_executor_id)
: header(std::move(header_)),
response_writer(dag_response, nullptr, records_per_chunk, encode_type, result_field_types, dag_context, collect_execute_summary,
return_executor_id)
{}

template <>
DAGBlockOutputStream<true>::DAGBlockOutputStream(BlockInputStreamPtr input_, StreamWriterPtr writer_, Int64 records_per_chunk_,
tipb::EncodeType encode_type_, std::vector<tipb::FieldType> result_field_types_, Block && header_, DAGContext & dag_context_,
bool collect_execute_summary_)
: finished(false),
writer(writer_),
result_field_types(std::move(result_field_types_)),
header(std::move(header_)),
records_per_chunk(records_per_chunk_),
encode_type(encode_type_),
current_records_num(0),
dag_context(dag_context_),
collect_execute_summary(collect_execute_summary_)
{
previous_execute_stats.resize(dag_context.profile_streams_map.size(), std::make_tuple(0, 0, 0));
if (encode_type == tipb::EncodeType::TypeDefault)
{
chunk_codec_stream = std::make_unique<DefaultChunkCodec>()->newCodecStream(result_field_types);
}
else if (encode_type == tipb::EncodeType::TypeChunk)
{
chunk_codec_stream = std::make_unique<ArrowChunkCodec>()->newCodecStream(result_field_types);
}
else if (encode_type == tipb::EncodeType::TypeCHBlock)
{
chunk_codec_stream = std::make_unique<CHBlockChunkCodec>()->newCodecStream(result_field_types);
records_per_chunk = -1;
}
else
{
throw Exception(
"Only Default and Arrow encode type is supported in StreamingDAGBlockOutputStream.", ErrorCodes::UNSUPPORTED_PARAMETER);
}
children.push_back(input_);
}

template <bool streaming>
void DAGBlockOutputStream<streaming>::writePrefix()
void DAGBlockOutputStream::writePrefix()
{
//something to do here?
}

template <>
void DAGBlockOutputStream<true>::readPrefix()
{
children.back()->readPrefix();
}

template <>
void DAGBlockOutputStream<false>::encodeChunkToDAGResponse()
{
auto dag_chunk = dag_response->add_chunks();
dag_chunk->set_rows_data(chunk_codec_stream->getString());
chunk_codec_stream->clear();
current_records_num = 0;
}

template <>
Block DAGBlockOutputStream<true>::readImpl()
{
if (finished)
return {};
while (Block block = children.back()->read())
{
if (!block)
{
finished = true;
return {};
}
write(block);
}
return {};
}
void DAGBlockOutputStream::write(const Block & block) { response_writer.write(block); }

template <bool streaming>
void DAGBlockOutputStream<streaming>::addExecuteSummaries(tipb::SelectResponse * response)
{
if (!collect_execute_summary)
return;
// add ExecutorExecutionSummary info
for (auto & p : dag_context.profile_streams_map)
{
auto * executeSummary = response->add_execution_summaries();
UInt64 time_processed_ns = 0;
UInt64 num_produced_rows = 0;
UInt64 num_iterations = 0;
for (auto & streamPtr : p.second.input_streams)
{
if (auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(streamPtr.get()))
{
time_processed_ns = std::max(time_processed_ns, p_stream->getProfileInfo().execution_time);
num_produced_rows += p_stream->getProfileInfo().rows;
num_iterations += p_stream->getProfileInfo().blocks;
}
}
for (auto & join_alias : dag_context.qb_id_to_join_alias_map[p.second.qb_id])
{
if (dag_context.profile_streams_map_for_join_build_side.find(join_alias)
!= dag_context.profile_streams_map_for_join_build_side.end())
{
UInt64 process_time_for_build = 0;
for (auto & join_stream : dag_context.profile_streams_map_for_join_build_side[join_alias])
{
if (auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(join_stream.get()))
process_time_for_build = std::max(process_time_for_build, p_stream->getProfileInfo().execution_time);
}
time_processed_ns += process_time_for_build;
}
}
executeSummary->set_time_processed_ns(time_processed_ns);
executeSummary->set_num_produced_rows(num_produced_rows);
executeSummary->set_num_iterations(num_iterations);
if constexpr (streaming)
executeSummary->set_executor_id(p.first);
}
}

template <>
void DAGBlockOutputStream<true>::encodeChunkToDAGResponse()
{
::coprocessor::BatchResponse resp;

tipb::SelectResponse stream_dag_response;
stream_dag_response.set_encode_type(encode_type);
auto dag_chunk = stream_dag_response.add_chunks();
dag_chunk->set_rows_data(chunk_codec_stream->getString());
chunk_codec_stream->clear();
current_records_num = 0;
addExecuteSummaries(&stream_dag_response);
std::string dag_data;
stream_dag_response.SerializeToString(&dag_data);
resp.set_data(dag_data);

writer->write(resp);
}

template <bool streaming>
void DAGBlockOutputStream<streaming>::writeSuffix()
void DAGBlockOutputStream::writeSuffix()
{
// todo error handle
if (current_records_num > 0)
{
encodeChunkToDAGResponse();
}
if constexpr (!streaming)
{
addExecuteSummaries(dag_response);
}
response_writer.finishWrite();
}

template <bool streaming>
void DAGBlockOutputStream<streaming>::write(const Block & block)
{
if (block.columns() != result_field_types.size())
throw Exception("Output column size mismatch with field type size", ErrorCodes::LOGICAL_ERROR);
if (records_per_chunk == -1)
{
current_records_num = 0;
if (block.rows() > 0)
{
chunk_codec_stream->encode(block, 0, block.rows());
encodeChunkToDAGResponse();
}
}
else
{
size_t rows = block.rows();
for (size_t row_index = 0; row_index < rows;)
{
if (current_records_num >= records_per_chunk)
{
encodeChunkToDAGResponse();
}
const size_t upper = std::min(row_index + (records_per_chunk - current_records_num), rows);
chunk_codec_stream->encode(block, row_index, upper);
current_records_num += (upper - row_index);
row_index = upper;
}
}
}

template <>
void DAGBlockOutputStream<true>::readSuffix()
{
// todo error handle
if (current_records_num > 0)
{
encodeChunkToDAGResponse();
}
children.back()->readSuffix();
}


} // namespace DB
31 changes: 6 additions & 25 deletions dbms/src/Flash/Coprocessor/DAGBlockOutputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,30 @@
#include <common/logger_useful.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <Flash/Coprocessor/DAGResponseWriter.h>
#include <tipb/select.pb.h>

#pragma GCC diagnostic pop

namespace DB
{

/// Serializes the stream of blocks in TiDB DAG response format.
template <bool streaming>
class DAGBlockOutputStream : public std::conditional_t<streaming, IProfilingBlockInputStream, IBlockOutputStream>
class DAGBlockOutputStream : public IBlockOutputStream
{
public:
DAGBlockOutputStream(tipb::SelectResponse * response_, Int64 records_per_chunk_, tipb::EncodeType encodeType_,
std::vector<tipb::FieldType> result_field_types, Block && header_, DAGContext & dag_context_, bool collect_execute_summary_);

DAGBlockOutputStream(BlockInputStreamPtr input_, StreamWriterPtr writer, Int64 records_per_chunk_, tipb::EncodeType encodeType_,
std::vector<tipb::FieldType> result_field_types, Block && header_, DAGContext & dag_context_, bool collect_execute_summary_);

std::vector<tipb::FieldType> result_field_types, Block && header_, DAGContext & dag_context_, bool collect_execute_summary_,
bool return_executor_id);

Block getHeader() const { return header; }
String getName() const { return "StreamingWriter"; }
void write(const Block & block);
void writePrefix();
void writeSuffix();
Block readImpl();
void readPrefix();
void readSuffix();

void encodeChunkToDAGResponse();
void addExecuteSummaries(tipb::SelectResponse * dag_response);

private:
bool finished;
tipb::SelectResponse * dag_response;
StreamWriterPtr writer;
std::vector<tipb::FieldType> result_field_types;
Block header;
Int64 records_per_chunk;
tipb::EncodeType encode_type;
std::unique_ptr<ChunkCodecStream> chunk_codec_stream;
Int64 current_records_num;
DAGContext & dag_context;
bool collect_execute_summary;
std::vector<std::tuple<UInt64, UInt64, UInt64>> previous_execute_stats;
DAGResponseWriter<false> response_writer;
};

} // namespace DB
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ try
{
bool collect_exec_summary = dag_request.has_collect_execution_summaries() && dag_request.collect_execution_summaries();
BlockOutputStreamPtr dag_output_stream
= std::make_shared<DAGBlockOutputStream<false>>(dag_response, context.getSettings().dag_records_per_chunk, dag.getEncodeType(),
dag.getResultFieldTypes(), streams.in->getHeader(), dag_context, collect_exec_summary);
= std::make_shared<DAGBlockOutputStream>(dag_response, context.getSettings().dag_records_per_chunk, dag.getEncodeType(),
dag.getResultFieldTypes(), streams.in->getHeader(), dag_context, collect_exec_summary, dag_request.has_root_executor());
copyData(*streams.in, *dag_output_stream);
}
else
Expand Down
Loading