Skip to content

Commit

Permalink
Merge branch 'mx/mergeJoinOutBlocks' of github.com:mengxin9014/tiflas…
Browse files Browse the repository at this point in the history
…h into mx/mergeJoinOutBlocks
  • Loading branch information
mengxin9014 committed Dec 27, 2022
2 parents 6137530 + 5dac43e commit 74b6da0
Show file tree
Hide file tree
Showing 80 changed files with 1,541 additions and 1,079 deletions.
92 changes: 0 additions & 92 deletions dbms/src/DataStreams/LimitByBlockInputStream.cpp

This file was deleted.

52 changes: 0 additions & 52 deletions dbms/src/DataStreams/LimitByBlockInputStream.h

This file was deleted.

88 changes: 9 additions & 79 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/ChunkDecodeAndSquash.h>
#include <Flash/Coprocessor/CoprocessorReader.h>
#include <Flash/Coprocessor/ExecutionSummary.h>
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Coprocessor/RemoteExecutionSummary.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Statistics/ConnectionProfileInfo.h>
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -50,15 +50,10 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream

String name;

/// this atomic variable is kind of a lock for the struct of execution_summaries:
/// if execution_summaries_inited[index] = true, the map execution_summaries[index]
/// itself will not be modified, so ExecutionSummaryCollector can read it safely, otherwise,
/// ExecutionSummaryCollector will just skip execution_summaries[index]
std::vector<std::atomic<bool>> execution_summaries_inited;
std::vector<std::unordered_map<String, ExecutionSummary>> execution_summaries;

const LoggerPtr log;

RemoteExecutionSummary remote_execution_summary;

uint64_t total_rows;

// For fine grained shuffle, sender will partition data into muiltiple streams by hashing.
Expand All @@ -68,64 +63,6 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream

std::unique_ptr<CHBlockChunkDecodeAndSquash> decoder_ptr;

void initRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index)
{
for (const auto & execution_summary : resp.execution_summaries())
{
if (likely(execution_summary.has_executor_id()))
{
auto & remote_execution_summary = execution_summaries[index][execution_summary.executor_id()];
remote_execution_summary.time_processed_ns = execution_summary.time_processed_ns();
remote_execution_summary.num_produced_rows = execution_summary.num_produced_rows();
remote_execution_summary.num_iterations = execution_summary.num_iterations();
remote_execution_summary.concurrency = execution_summary.concurrency();
DM::ScanContext scan_context;
scan_context.deserialize(execution_summary.tiflash_scan_context());
remote_execution_summary.scan_context->merge(scan_context);
}
}
execution_summaries_inited[index].store(true);
}

void addRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index)
{
if (unlikely(resp.execution_summaries_size() == 0))
return;

if (!execution_summaries_inited[index].load())
{
initRemoteExecutionSummaries(resp, index);
return;
}
if constexpr (is_streaming_reader)
throw Exception(
fmt::format(
"There are more than one execution summary packet of index {} in streaming reader, "
"this should not happen",
index));
auto & execution_summaries_map = execution_summaries[index];
for (const auto & execution_summary : resp.execution_summaries())
{
if (likely(execution_summary.has_executor_id()))
{
const auto & executor_id = execution_summary.executor_id();
if (unlikely(execution_summaries_map.find(executor_id) == execution_summaries_map.end()))
{
LOG_WARNING(log, "execution {} not found in execution_summaries, this should not happen", executor_id);
continue;
}
auto & remote_execution_summary = execution_summaries_map[executor_id];
remote_execution_summary.time_processed_ns = std::max(remote_execution_summary.time_processed_ns, execution_summary.time_processed_ns());
remote_execution_summary.num_produced_rows += execution_summary.num_produced_rows();
remote_execution_summary.num_iterations += execution_summary.num_iterations();
remote_execution_summary.concurrency += execution_summary.concurrency();
DM::ScanContext scan_context;
scan_context.deserialize(execution_summary.tiflash_scan_context());
remote_execution_summary.scan_context->merge(scan_context);
}
}
}

bool fetchRemoteResult()
{
while (true)
Expand All @@ -147,14 +84,13 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
throw Exception(result.resp->error().DebugString());
}

size_t index = 0;
if constexpr (is_streaming_reader)
index = result.call_index;

/// only the last response contains execution summaries
if (result.resp != nullptr)
addRemoteExecutionSummaries(*result.resp, index);
remote_execution_summary.add(*result.resp);

size_t index = 0;
if constexpr (is_streaming_reader)
index = result.call_index;
const auto & decode_detail = result.decode_detail;
auto & connection_profile_info = connection_profile_infos[index];
connection_profile_info.packets += decode_detail.packets;
Expand All @@ -179,16 +115,10 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
: remote_reader(remote_reader_)
, source_num(remote_reader->getSourceNum())
, name(fmt::format("TiRemote({})", RemoteReader::name))
, execution_summaries_inited(source_num)
, log(Logger::get(name, req_id, executor_id))
, total_rows(0)
, stream_id(stream_id_)
{
for (size_t i = 0; i < source_num; ++i)
{
execution_summaries_inited[i].store(false);
}
execution_summaries.resize(source_num);
connection_profile_infos.resize(source_num);
sample_block = Block(getColumnWithTypeAndName(toNamesAndTypes(remote_reader->getOutputSchema())));
static constexpr size_t squash_rows_limit = 8192;
Expand Down Expand Up @@ -228,9 +158,9 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
return block;
}

const std::unordered_map<String, ExecutionSummary> * getRemoteExecutionSummaries(size_t index)
const RemoteExecutionSummary & getRemoteExecutionSummary()
{
return execution_summaries_inited[index].load() ? &execution_summaries[index] : nullptr;
return remote_execution_summary;
}

size_t getTotalRows() const { return total_rows; }
Expand Down
Loading

0 comments on commit 74b6da0

Please sign in to comment.