Skip to content

Commit

Permalink
refine remote execution summary (#6349)
Browse files Browse the repository at this point in the history
ref #5900
  • Loading branch information
SeaRise authored Dec 22, 2022
1 parent 0866058 commit ba882a3
Show file tree
Hide file tree
Showing 13 changed files with 379 additions and 216 deletions.
88 changes: 9 additions & 79 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/ChunkDecodeAndSquash.h>
#include <Flash/Coprocessor/CoprocessorReader.h>
#include <Flash/Coprocessor/ExecutionSummary.h>
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Coprocessor/RemoteExecutionSummary.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Statistics/ConnectionProfileInfo.h>
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -50,15 +50,10 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream

String name;

/// this atomic variable is kind of a lock for the struct of execution_summaries:
/// if execution_summaries_inited[index] = true, the map execution_summaries[index]
/// itself will not be modified, so ExecutionSummaryCollector can read it safely, otherwise,
/// ExecutionSummaryCollector will just skip execution_summaries[index]
std::vector<std::atomic<bool>> execution_summaries_inited;
std::vector<std::unordered_map<String, ExecutionSummary>> execution_summaries;

const LoggerPtr log;

RemoteExecutionSummary remote_execution_summary;

uint64_t total_rows;

// For fine grained shuffle, sender will partition data into muiltiple streams by hashing.
Expand All @@ -68,64 +63,6 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream

std::unique_ptr<CHBlockChunkDecodeAndSquash> decoder_ptr;

void initRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index)
{
for (const auto & execution_summary : resp.execution_summaries())
{
if (likely(execution_summary.has_executor_id()))
{
auto & remote_execution_summary = execution_summaries[index][execution_summary.executor_id()];
remote_execution_summary.time_processed_ns = execution_summary.time_processed_ns();
remote_execution_summary.num_produced_rows = execution_summary.num_produced_rows();
remote_execution_summary.num_iterations = execution_summary.num_iterations();
remote_execution_summary.concurrency = execution_summary.concurrency();
DM::ScanContext scan_context;
scan_context.deserialize(execution_summary.tiflash_scan_context());
remote_execution_summary.scan_context->merge(scan_context);
}
}
execution_summaries_inited[index].store(true);
}

void addRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index)
{
if (unlikely(resp.execution_summaries_size() == 0))
return;

if (!execution_summaries_inited[index].load())
{
initRemoteExecutionSummaries(resp, index);
return;
}
if constexpr (is_streaming_reader)
throw Exception(
fmt::format(
"There are more than one execution summary packet of index {} in streaming reader, "
"this should not happen",
index));
auto & execution_summaries_map = execution_summaries[index];
for (const auto & execution_summary : resp.execution_summaries())
{
if (likely(execution_summary.has_executor_id()))
{
const auto & executor_id = execution_summary.executor_id();
if (unlikely(execution_summaries_map.find(executor_id) == execution_summaries_map.end()))
{
LOG_WARNING(log, "execution {} not found in execution_summaries, this should not happen", executor_id);
continue;
}
auto & remote_execution_summary = execution_summaries_map[executor_id];
remote_execution_summary.time_processed_ns = std::max(remote_execution_summary.time_processed_ns, execution_summary.time_processed_ns());
remote_execution_summary.num_produced_rows += execution_summary.num_produced_rows();
remote_execution_summary.num_iterations += execution_summary.num_iterations();
remote_execution_summary.concurrency += execution_summary.concurrency();
DM::ScanContext scan_context;
scan_context.deserialize(execution_summary.tiflash_scan_context());
remote_execution_summary.scan_context->merge(scan_context);
}
}
}

bool fetchRemoteResult()
{
while (true)
Expand All @@ -147,14 +84,13 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
throw Exception(result.resp->error().DebugString());
}

size_t index = 0;
if constexpr (is_streaming_reader)
index = result.call_index;

/// only the last response contains execution summaries
if (result.resp != nullptr)
addRemoteExecutionSummaries(*result.resp, index);
remote_execution_summary.add(*result.resp);

size_t index = 0;
if constexpr (is_streaming_reader)
index = result.call_index;
const auto & decode_detail = result.decode_detail;
auto & connection_profile_info = connection_profile_infos[index];
connection_profile_info.packets += decode_detail.packets;
Expand All @@ -179,16 +115,10 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
: remote_reader(remote_reader_)
, source_num(remote_reader->getSourceNum())
, name(fmt::format("TiRemote({})", RemoteReader::name))
, execution_summaries_inited(source_num)
, log(Logger::get(name, req_id, executor_id))
, total_rows(0)
, stream_id(stream_id_)
{
for (size_t i = 0; i < source_num; ++i)
{
execution_summaries_inited[i].store(false);
}
execution_summaries.resize(source_num);
connection_profile_infos.resize(source_num);
sample_block = Block(getColumnWithTypeAndName(toNamesAndTypes(remote_reader->getOutputSchema())));
static constexpr size_t squash_rows_limit = 8192;
Expand Down Expand Up @@ -228,9 +158,9 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
return block;
}

const std::unordered_map<String, ExecutionSummary> * getRemoteExecutionSummaries(size_t index)
const RemoteExecutionSummary & getRemoteExecutionSummary()
{
return execution_summaries_inited[index].load() ? &execution_summaries[index] : nullptr;
return remote_execution_summary;
}

size_t getTotalRows() const { return total_rows; }
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class DAGContext
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
, initialize_concurrency(concurrency)
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
, is_mpp_task(true)
, is_root_mpp_task(false)
, log(Logger::get(log_identifier))
Expand Down
40 changes: 23 additions & 17 deletions dbms/src/Flash/Coprocessor/ExecutionSummary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,30 @@

namespace DB
{
void ExecutionSummary::merge(const ExecutionSummary & other)
{
time_processed_ns = std::max(time_processed_ns, other.time_processed_ns);
num_produced_rows += other.num_produced_rows;
num_iterations += other.num_iterations;
concurrency += other.concurrency;
scan_context->merge(*other.scan_context);
}

void ExecutionSummary::merge(const tipb::ExecutorExecutionSummary & other)
{
time_processed_ns = std::max(time_processed_ns, other.time_processed_ns());
num_produced_rows += other.num_produced_rows();
num_iterations += other.num_iterations();
concurrency += other.concurrency();
scan_context->merge(other.tiflash_scan_context());
}

void ExecutionSummary::merge(const ExecutionSummary & other, bool streaming_call)
void ExecutionSummary::init(const tipb::ExecutorExecutionSummary & other)
{
if (streaming_call)
{
time_processed_ns = std::max(time_processed_ns, other.time_processed_ns);
num_produced_rows = std::max(num_produced_rows, other.num_produced_rows);
num_iterations = std::max(num_iterations, other.num_iterations);
concurrency = std::max(concurrency, other.concurrency);
scan_context->merge(*other.scan_context);
}
else
{
time_processed_ns = std::max(time_processed_ns, other.time_processed_ns);
num_produced_rows += other.num_produced_rows;
num_iterations += other.num_iterations;
concurrency += other.concurrency;
scan_context->merge(*other.scan_context);
}
time_processed_ns = other.time_processed_ns();
num_produced_rows = other.num_produced_rows();
num_iterations = other.num_iterations();
concurrency = other.concurrency();
scan_context->deserialize(other.tiflash_scan_context());
}
} // namespace DB
7 changes: 5 additions & 2 deletions dbms/src/Flash/Coprocessor/ExecutionSummary.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <Storages/DeltaMerge/ScanContext.h>
#include <common/types.h>
#include <tipb/select.pb.h>

#include <memory>

Expand All @@ -29,11 +30,13 @@ struct ExecutionSummary
UInt64 num_iterations = 0;
UInt64 concurrency = 0;

std::unique_ptr<DB::DM::ScanContext> scan_context = std::make_unique<DB::DM::ScanContext>();
DM::ScanContextPtr scan_context = std::make_shared<DB::DM::ScanContext>();

ExecutionSummary() = default;

void merge(const ExecutionSummary & other, bool streaming_call);
void merge(const ExecutionSummary & other);
void merge(const tipb::ExecutorExecutionSummary & other);
void init(const tipb::ExecutorExecutionSummary & other);
};

} // namespace DB
Loading

0 comments on commit ba882a3

Please sign in to comment.