Skip to content

Commit

Permalink
.*: Separate FineGrainedShuffleWriter, HashParitionWriter, `Broad…
Browse files Browse the repository at this point in the history
…castOrPassThroughWriter` from `StreamingDAGResponseWriter` (#5910)

ref #5900
  • Loading branch information
SeaRise authored Sep 27, 2022
1 parent 33b79e0 commit ef34f3b
Show file tree
Hide file tree
Showing 20 changed files with 1,582 additions and 694 deletions.
9 changes: 2 additions & 7 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,12 @@ try
auto streaming_writer = std::make_shared<StreamWriter>(writer);
TiDB::TiDBCollators collators;

std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<StreamWriterPtr, false>>(
std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<StreamWriterPtr>>(
streaming_writer,
std::vector<Int64>(),
collators,
tipb::ExchangeType::PassThrough,
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
true,
dag_context,
/*fine_grained_shuffle_stream_count=*/0,
/*fine_grained_shuffle_batch_size=*/0);
dag_context);
dag_output_stream = std::make_shared<DAGBlockOutputStream>(streams.in->getHeader(), std::move(response_writer));
copyData(*streams.in, *dag_output_stream);
}
Expand Down
57 changes: 22 additions & 35 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
#include <Flash/Coprocessor/JoinInterpreterHelper.h>
#include <Flash/Coprocessor/MockSourceStream.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Mpp/newMPPExchangeWriter.h>
#include <Interpreters/Aggregator.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/Join.h>
Expand Down Expand Up @@ -738,44 +738,31 @@ void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline)
const uint64_t stream_count = query_block.exchange_sender->fine_grained_shuffle_stream_count();
const uint64_t batch_size = query_block.exchange_sender->fine_grained_shuffle_batch_size();

if (enableFineGrainedShuffle(stream_count))
auto enable_fine_grained_shuffle = enableFineGrainedShuffle(stream_count);
String extra_info;
if (enable_fine_grained_shuffle)
{
pipeline.transform([&](auto & stream) {
// construct writer
std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<MPPTunnelSetPtr, true>>(
context.getDAGContext()->tunnel_set,
partition_col_ids,
partition_col_collators,
exchange_sender.tp(),
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response
dagContext(),
stream_count,
batch_size);
stream = std::make_shared<ExchangeSenderBlockInputStream>(stream, std::move(response_writer), log->identifier());
stream->setExtraInfo(String(enableFineGrainedShuffleExtraInfo));
});
extra_info = String(enableFineGrainedShuffleExtraInfo);
RUNTIME_CHECK(exchange_sender.tp() == tipb::ExchangeType::Hash, ExchangeType_Name(exchange_sender.tp()));
RUNTIME_CHECK(stream_count <= 1024, stream_count);
}
else
{
pipeline.transform([&](auto & stream) {
std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<MPPTunnelSetPtr, false>>(
context.getDAGContext()->tunnel_set,
partition_col_ids,
partition_col_collators,
exchange_sender.tp(),
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response
dagContext(),
stream_count,
batch_size);
stream = std::make_shared<ExchangeSenderBlockInputStream>(stream, std::move(response_writer), log->identifier());
});
}
pipeline.transform([&](auto & stream) {
// construct writer
std::unique_ptr<DAGResponseWriter> response_writer = newMPPExchangeWriter(
context.getDAGContext()->tunnel_set,
partition_col_ids,
partition_col_collators,
exchange_sender.tp(),
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response
dagContext(),
enable_fine_grained_shuffle,
stream_count,
batch_size);
stream = std::make_shared<ExchangeSenderBlockInputStream>(stream, std::move(response_writer), log->identifier());
stream->setExtraInfo(extra_info);
});
}

void DAGQueryBlockInterpreter::handleMockExchangeSender(DAGPipeline & pipeline)
Expand Down
21 changes: 20 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,24 @@

namespace DB
{
void ExecutionSummary::merge(const ExecutionSummary & other, bool streaming_call)
{
if (streaming_call)
{
time_processed_ns = std::max(time_processed_ns, other.time_processed_ns);
num_produced_rows = std::max(num_produced_rows, other.num_produced_rows);
num_iterations = std::max(num_iterations, other.num_iterations);
concurrency = std::max(concurrency, other.concurrency);
}
else
{
time_processed_ns = std::max(time_processed_ns, other.time_processed_ns);
num_produced_rows += other.num_produced_rows;
num_iterations += other.num_iterations;
concurrency += other.concurrency;
}
}

/// delta_mode means when for a streaming call, return the delta execution summary
/// because TiDB is not aware of the streaming call when it handle the execution summaries
/// so we need to "pretend to be a unary call", can be removed if TiDB support streaming
Expand Down Expand Up @@ -179,7 +197,8 @@ DAGResponseWriter::DAGResponseWriter(
{
records_per_chunk = -1;
}
if (dag_context.encode_type != tipb::EncodeType::TypeCHBlock && dag_context.encode_type != tipb::EncodeType::TypeChunk
if (dag_context.encode_type != tipb::EncodeType::TypeCHBlock
&& dag_context.encode_type != tipb::EncodeType::TypeChunk
&& dag_context.encode_type != tipb::EncodeType::TypeDefault)
{
throw TiFlashException(
Expand Down
28 changes: 3 additions & 25 deletions dbms/src/Flash/Coprocessor/DAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,9 @@

#pragma once

#include <Core/Types.h>
#include <DataTypes/IDataType.h>
#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <common/logger_useful.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <Flash/Coprocessor/DAGContext.h>
#include <common/types.h>
#include <tipb/select.pb.h>
#pragma GCC diagnostic pop

namespace DB
{
Expand All @@ -40,23 +34,7 @@ struct ExecutionSummary
, concurrency(0)
{}

void merge(const ExecutionSummary & other, bool streaming_call)
{
if (streaming_call)
{
time_processed_ns = std::max(time_processed_ns, other.time_processed_ns);
num_produced_rows = std::max(num_produced_rows, other.num_produced_rows);
num_iterations = std::max(num_iterations, other.num_iterations);
concurrency = std::max(concurrency, other.concurrency);
}
else
{
time_processed_ns = std::max(time_processed_ns, other.time_processed_ns);
num_produced_rows += other.num_produced_rows;
num_iterations += other.num_iterations;
concurrency += other.concurrency;
}
}
void merge(const ExecutionSummary & other, bool streaming_call);
};

class DAGResponseWriter
Expand Down
Loading

0 comments on commit ef34f3b

Please sign in to comment.