Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.*: Remove useless settings join_concurrent_build #6183

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/MergeSortingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ static void enrichBlockWithConstants(Block & block, const Block & header)

MergeSortingBlockInputStream::MergeSortingBlockInputStream(
const BlockInputStreamPtr & input,
SortDescription & description_,
const SortDescription & description_,
size_t max_merged_block_size_,
size_t limit_,
size_t max_bytes_before_external_sort_,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/MergeSortingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class MergeSortingBlockInputStream : public IProfilingBlockInputStream
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
MergeSortingBlockInputStream(
const BlockInputStreamPtr & input,
SortDescription & description_,
const SortDescription & description_,
size_t max_merged_block_size_,
size_t limit_,
size_t max_bytes_before_external_sort_,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/PartialSortingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class PartialSortingBlockInputStream : public IProfilingBlockInputStream
/// limit - if not 0, then you can sort each block not completely, but only `limit` first rows by order.
PartialSortingBlockInputStream(
const BlockInputStreamPtr & input_,
SortDescription & description_,
const SortDescription & description_,
const String & req_id,
size_t limit_ = 0)
: description(description_)
Expand Down
21 changes: 13 additions & 8 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,18 +268,23 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline &

auto & join_execute_info = dagContext().getJoinExecuteInfoMap()[query_block.source_name];

size_t join_build_concurrency = settings.join_concurrent_build ? std::min(max_streams, build_pipeline.streams.size()) : 1;
size_t join_build_concurrency = std::max(build_pipeline.streams.size(), build_pipeline.streams_with_non_joined_data.size());

/// build side streams
executeExpression(build_pipeline, build_side_prepare_actions, log, "append join key and join filters for build side");
// add a HashJoinBuildBlockInputStream to build a shared hash table
auto get_concurrency_build_index = JoinInterpreterHelper::concurrencyBuildIndexGenerator(join_build_concurrency);
build_pipeline.transform([&](auto & stream) {
stream = std::make_shared<HashJoinBuildBlockInputStream>(stream, join_ptr, get_concurrency_build_index(), log->identifier());
stream->setExtraInfo(
fmt::format("join build, build_side_root_executor_id = {}", dagContext().getJoinExecuteInfoMap()[query_block.source_name].build_side_root_executor_id));
join_execute_info.join_build_streams.push_back(stream);
});
auto build_streams = [&](BlockInputStreams & streams) {
size_t build_index = 0;
for (auto & stream : streams)
{
stream = std::make_shared<HashJoinBuildBlockInputStream>(stream, join_ptr, build_index++, log->identifier());
stream->setExtraInfo(
fmt::format("join build, build_side_root_executor_id = {}", dagContext().getJoinExecuteInfoMap()[query_block.source_name].build_side_root_executor_id));
join_execute_info.join_build_streams.push_back(stream);
}
};
build_streams(build_pipeline.streams);
build_streams(build_pipeline.streams_with_non_joined_data);
// for test, join executor need the return blocks to output.
executeUnion(build_pipeline, max_streams, log, /*ignore_block=*/!context.isTest(), "for join");

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ void executeExpression(
void orderStreams(
DAGPipeline & pipeline,
size_t max_streams,
SortDescription order_descr,
const SortDescription & order_descr,
Int64 limit,
bool enable_fine_grained_shuffle,
const Context & context,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/InterpreterUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void executeExpression(
void orderStreams(
DAGPipeline & pipeline,
size_t max_streams,
SortDescription order_descr,
const SortDescription & order_descr,
Int64 limit,
bool enable_fine_grained_shuffle,
const Context & context,
Expand Down
8 changes: 0 additions & 8 deletions dbms/src/Flash/Coprocessor/JoinInterpreterHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,4 @@ std::tuple<ExpressionActionsPtr, Names, String> prepareJoin(
dag_analyzer.appendJoinKeyAndJoinFilters(chain, keys, key_types, key_names, left, is_right_out_join, filters, filter_column_name);
return {chain.getLastActions(), std::move(key_names), std::move(filter_column_name)};
}

std::function<size_t()> concurrencyBuildIndexGenerator(size_t join_build_concurrency)
{
size_t init_value = 0;
return [init_value, join_build_concurrency]() mutable {
return (init_value++) % join_build_concurrency;
};
}
} // namespace DB::JoinInterpreterHelper
2 changes: 0 additions & 2 deletions dbms/src/Flash/Coprocessor/JoinInterpreterHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,5 @@ std::tuple<ExpressionActionsPtr, Names, String> prepareJoin(
bool left,
bool is_right_out_join,
const google::protobuf::RepeatedPtrField<tipb::Expr> & filters);

std::function<size_t()> concurrencyBuildIndexGenerator(size_t join_build_concurrency);
} // namespace JoinInterpreterHelper
} // namespace DB
21 changes: 12 additions & 9 deletions dbms/src/Flash/Planner/plans/PhysicalJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,21 +214,24 @@ void PhysicalJoin::probeSideTransform(DAGPipeline & probe_pipeline, Context & co
void PhysicalJoin::buildSideTransform(DAGPipeline & build_pipeline, Context & context, size_t max_streams)
{
auto & dag_context = *context.getDAGContext();
const auto & settings = context.getSettingsRef();

size_t join_build_concurrency = settings.join_concurrent_build ? std::min(max_streams, build_pipeline.streams.size()) : 1;
size_t join_build_concurrency = std::max(build_pipeline.streams.size(), build_pipeline.streams_with_non_joined_data.size());

/// build side streams
executeExpression(build_pipeline, build_side_prepare_actions, log, "append join key and join filters for build side");
// add a HashJoinBuildBlockInputStream to build a shared hash table
auto get_concurrency_build_index = JoinInterpreterHelper::concurrencyBuildIndexGenerator(join_build_concurrency);
String join_build_extra_info = fmt::format("join build, build_side_root_executor_id = {}", build()->execId());
auto & join_execute_info = dag_context.getJoinExecuteInfoMap()[execId()];
build_pipeline.transform([&](auto & stream) {
stream = std::make_shared<HashJoinBuildBlockInputStream>(stream, join_ptr, get_concurrency_build_index(), log->identifier());
stream->setExtraInfo(join_build_extra_info);
join_execute_info.join_build_streams.push_back(stream);
});
auto build_streams = [&](BlockInputStreams & streams) {
size_t build_index = 0;
for (auto & stream : streams)
{
stream = std::make_shared<HashJoinBuildBlockInputStream>(stream, join_ptr, build_index++, log->identifier());
stream->setExtraInfo(join_build_extra_info);
join_execute_info.join_build_streams.push_back(stream);
}
};
build_streams(build_pipeline.streams);
build_streams(build_pipeline.streams_with_non_joined_data);
// for test, join executor need the return blocks to output.
executeUnion(build_pipeline, max_streams, log, /*ignore_block=*/!context.isTest(), "for join");

Expand Down
1 change: 0 additions & 1 deletion dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,6 @@ struct Settings
M(SettingUInt64, max_bytes_in_distinct, 0, "Maximum total size of state (in uncompressed bytes) in memory for the execution of DISTINCT.") \
M(SettingOverflowMode<false>, distinct_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \
\
M(SettingBool, join_concurrent_build, true, "Build hash table concurrently for join.") \
M(SettingUInt64, max_memory_usage, 0, "Maximum memory usage for processing of single query. Zero means unlimited.") \
M(SettingUInt64, max_memory_usage_for_user, 0, "Maximum memory usage for processing all concurrently running queries for the user. Zero means unlimited.") \
M(SettingUInt64, max_memory_usage_for_all_queries, 0, "Maximum memory usage for processing all concurrently running queries on the server. Zero means unlimited.") \
Expand Down