Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP].*: Refine spill agg #6997

Closed
wants to merge 21 commits into from
8 changes: 5 additions & 3 deletions dbms/src/DataStreams/AggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
// limitations under the License.

#include <DataStreams/AggregatingBlockInputStream.h>
#include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
#include <DataStreams/MergingAndConvertingBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/SpilledRestoreMergingBlockInputStream.h>

namespace DB
{
Expand Down Expand Up @@ -67,8 +67,10 @@ Block AggregatingBlockInputStream::readImpl()
aggregator.spill(*data_variants);
}
aggregator.finishSpill();
BlockInputStreams input_streams = aggregator.restoreSpilledData();
impl = std::make_unique<MergingAggregatedMemoryEfficientBlockInputStream>(input_streams, params, final, 1, 1, log->identifier());
if (!aggregator.hasRestoreData())
impl = std::make_unique<NullBlockInputStream>(aggregator.getHeader(final));
else
impl = std::make_unique<SpilledRestoreMergingBlockInputStream>(aggregator, final, log->identifier());
}
}

Expand Down
31 changes: 20 additions & 11 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
#include <Common/FmtUtils.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
#include <DataStreams/MergingAndConvertingBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/ParallelAggregatingBlockInputStream.h>
#include <DataStreams/SpilledRestoreMergingBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>

namespace DB
Expand All @@ -29,14 +29,12 @@ ParallelAggregatingBlockInputStream::ParallelAggregatingBlockInputStream(
const Aggregator::Params & params_,
bool final_,
size_t max_threads_,
size_t temporary_data_merge_threads_,
const String & req_id)
: log(Logger::get(req_id))
, params(params_)
, aggregator(params, req_id)
, final(final_)
, max_threads(std::min(inputs.size(), max_threads_))
, temporary_data_merge_threads(temporary_data_merge_threads_)
, keys_size(params.keys_size)
, aggregates_size(params.aggregates_size)
, handler(*this)
Expand Down Expand Up @@ -112,14 +110,25 @@ Block ParallelAggregatingBlockInputStream::readImpl()
*/

aggregator.finishSpill();
BlockInputStreams input_streams = aggregator.restoreSpilledData();
impl = std::make_unique<MergingAggregatedMemoryEfficientBlockInputStream>(
input_streams,
params,
final,
temporary_data_merge_threads,
temporary_data_merge_threads,
log->identifier());
if (!aggregator.hasRestoreData())
{
impl = std::make_unique<NullBlockInputStream>(aggregator.getHeader(final));
}
else
{
size_t restore_merge_stream_num = std::max(max_threads, 1);
if (restore_merge_stream_num > 1)
{
BlockInputStreams merging_streams;
for (size_t i = 0; i < restore_merge_stream_num; ++i)
merging_streams.push_back(std::make_shared<SpilledRestoreMergingBlockInputStream>(aggregator, final, log->identifier()));
impl = std::make_unique<UnionBlockInputStream<>>(merging_streams, BlockInputStreams{}, max_threads, log->identifier());
}
else
{
impl = std::make_unique<SpilledRestoreMergingBlockInputStream>(aggregator, final, log->identifier());
}
}
}

executed = true;
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
const Aggregator::Params & params_,
bool final_,
size_t max_threads_,
size_t temporary_data_merge_threads_,
const String & req_id);

String getName() const override { return NAME; }
Expand Down Expand Up @@ -69,7 +68,6 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
Aggregator aggregator;
bool final;
size_t max_threads;
size_t temporary_data_merge_threads;

size_t keys_size;
size_t aggregates_size;
Expand Down
26 changes: 15 additions & 11 deletions dbms/src/DataStreams/SpilledFilesInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,34 @@ SpilledFilesInputStream::SpilledFilesInputStream(std::vector<SpilledFileInfo> &&
, max_supported_spill_version(max_supported_spill_version_)
{
RUNTIME_CHECK_MSG(!spilled_file_infos.empty(), "Spilled files must not be empty");
current_reading_file_index = 0;
current_file_stream = std::make_unique<SpilledFileStream>(std::move(spilled_file_infos[0]), header, file_provider, max_supported_spill_version);
}

Block SpilledFilesInputStream::readImpl()
{
if (unlikely(current_file_stream == nullptr))
if unlikely (!initialized)
{
iter = spilled_file_infos.begin();
assert(iter != spilled_file_infos.end());
current_file_stream = std::make_unique<SpilledFileStream>(std::move(*iter), header, file_provider, max_supported_spill_version);
initialized = true;
}
if unlikely (iter == spilled_file_infos.end())
return {};

assert(current_file_stream);
Block ret = current_file_stream->block_in->read();
if (ret)
return ret;

for (++current_reading_file_index; current_reading_file_index < spilled_file_infos.size(); ++current_reading_file_index)
current_file_stream.reset();
for (++iter; iter != spilled_file_infos.end(); ++iter)
{
current_file_stream = std::make_unique<SpilledFileStream>(std::move(spilled_file_infos[current_reading_file_index]),
header,
file_provider,
max_supported_spill_version);
current_file_stream = std::make_unique<SpilledFileStream>(std::move(*iter), header, file_provider, max_supported_spill_version);
ret = current_file_stream->block_in->read();
if (ret)
return ret;
current_file_stream.reset();
}
current_file_stream.reset();
return ret;
return {};
}

Block SpilledFilesInputStream::getHeader() const
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/DataStreams/SpilledFilesInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class SpilledFilesInputStream : public IProfilingBlockInputStream
};

std::vector<SpilledFileInfo> spilled_file_infos;
size_t current_reading_file_index;
std::vector<SpilledFileInfo>::iterator iter;
bool initialized = false;
Block header;
FileProviderPtr file_provider;
Int64 max_supported_spill_version;
Expand Down
59 changes: 59 additions & 0 deletions dbms/src/DataStreams/SpilledRestoreMergingBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <DataStreams/IProfilingBlockInputStream.h>
#include <Interpreters/Aggregator.h>

namespace DB
{
class SpilledRestoreMergingBlockInputStream : public IProfilingBlockInputStream
{
public:
SpilledRestoreMergingBlockInputStream(Aggregator & aggregator_, bool is_final_, const String & req_id)
: aggregator(aggregator_)
, is_final(is_final_)
, log(Logger::get(req_id))
{
}

String getName() const override { return "SpilledRestoreMerging"; }

Block getHeader() const override { return aggregator.getHeader(is_final); }

protected:
Block readImpl() override
{
while (true)
{
Block out_block = popBlocksListFront(cur_block_list);
if (out_block)
return out_block;

auto bucket_block_to_merge = aggregator.restoreBucketBlocks();
if (bucket_block_to_merge.empty())
return {};
cur_block_list = aggregator.vstackBlocks(bucket_block_to_merge, is_final);
}
}

private:
Aggregator & aggregator;
bool is_final;
const LoggerPtr log;

BlocksList cur_block_list;
};
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ Aggregator::Params buildParams(
const TiDB::TiDBCollators & collators,
const AggregateDescriptions & aggregate_descriptions,
bool is_final_agg,
bool is_local_agg,
const SpillConfig & spill_config)
{
ColumnNumbers keys;
Expand All @@ -107,6 +108,7 @@ Aggregator::Params buildParams(
getAverageThreshold(total_two_level_threshold_bytes, agg_streams_size),
getAverageThreshold(settings.max_bytes_before_external_group_by, agg_streams_size),
!is_final_agg,
is_local_agg,
spill_config,
context.getSettingsRef().max_block_size,
has_collator ? collators : TiDB::dummy_collators);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Aggregator::Params buildParams(
const TiDB::TiDBCollators & collators,
const AggregateDescriptions & aggregate_descriptions,
bool is_final_agg,
bool is_local_agg,
const SpillConfig & spill_config);

void fillArgColumnNumbers(AggregateDescriptions & aggregate_descriptions, const Block & before_agg_header);
Expand Down
25 changes: 9 additions & 16 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ void DAGQueryBlockInterpreter::executeAggregation(
settings.max_spilled_rows_per_file,
settings.max_spilled_bytes_per_file,
context.getFileProvider());
assert(!pipeline.streams.empty());
bool is_local_agg = enable_fine_grained_shuffle || (1 == pipeline.streams.size());
auto params = AggregationInterpreterHelper::buildParams(
context,
before_agg_header,
Expand All @@ -422,31 +424,32 @@ void DAGQueryBlockInterpreter::executeAggregation(
collators,
aggregate_descriptions,
is_final_agg,
is_local_agg,
spill_config);

if (enable_fine_grained_shuffle)
if (is_local_agg)
{
/// Go straight forward without merging phase when enable_fine_grained_shuffle
auto extra_info = enable_fine_grained_shuffle ? String(enableFineGrainedShuffleExtraInfo) : "";
/// Go straight forward without merging phase for local agg.
pipeline.transform([&](auto & stream) {
stream = std::make_shared<AggregatingBlockInputStream>(
stream,
params,
true,
log->identifier());
stream->setExtraInfo(String(enableFineGrainedShuffleExtraInfo));
stream->setExtraInfo(extra_info);
});
recordProfileStreams(pipeline, query_block.aggregation_name);
}
else if (pipeline.streams.size() > 1)
else
{
/// If there are several sources, then we perform parallel aggregation
/// If there are several sources(non local agg), then we perform parallel aggregation
BlockInputStreamPtr stream = std::make_shared<ParallelAggregatingBlockInputStream>(
pipeline.streams,
BlockInputStreams{},
params,
true,
max_streams,
settings.aggregation_memory_efficient_merge_threads ? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads) : static_cast<size_t>(settings.max_threads),
log->identifier());

pipeline.streams.resize(1);
Expand All @@ -456,16 +459,6 @@ void DAGQueryBlockInterpreter::executeAggregation(
recordProfileStreams(pipeline, query_block.aggregation_name);
restorePipelineConcurrency(pipeline);
}
else
{
assert(pipeline.streams.size() == 1);
pipeline.firstStream() = std::make_shared<AggregatingBlockInputStream>(
pipeline.firstStream(),
params,
true,
log->identifier());
recordProfileStreams(pipeline, query_block.aggregation_name);
}
}

void DAGQueryBlockInterpreter::executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc, bool enable_fine_grained_shuffle)
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ void EventTask::finalize() noexcept
}
catch (...)
{
// ignore exception from finalizeImpl.
LOG_WARNING(log, "finalizeImpl throw exception: {}", getCurrentExceptionMessage(true, true));
exec_status.onErrorOccurred(std::current_exception());
}
}

Expand Down
26 changes: 9 additions & 17 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont
context.getSettingsRef().max_spilled_rows_per_file,
context.getSettingsRef().max_spilled_bytes_per_file,
context.getFileProvider());
assert(!pipeline.streams.empty());
bool is_local_agg = fine_grained_shuffle.enable() || pipeline.streams.size() == 1;
auto params = AggregationInterpreterHelper::buildParams(
context,
before_agg_header,
Expand All @@ -112,47 +114,38 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont
aggregation_collators,
aggregate_descriptions,
is_final_agg,
is_local_agg,
spill_config);

if (fine_grained_shuffle.enable())
if (is_local_agg)
{
/// For fine_grained_shuffle, just do aggregation in streams independently
auto extra_info = fine_grained_shuffle.enable() ? String(enableFineGrainedShuffleExtraInfo) : "";
/// For local agg, just do aggregation in streams independently.
pipeline.transform([&](auto & stream) {
stream = std::make_shared<AggregatingBlockInputStream>(
stream,
params,
true,
log->identifier());
stream->setExtraInfo(String(enableFineGrainedShuffleExtraInfo));
stream->setExtraInfo(extra_info);
});
}
else if (pipeline.streams.size() > 1)
else
{
/// If there are several sources, then we perform parallel aggregation
const Settings & settings = context.getSettingsRef();
BlockInputStreamPtr stream = std::make_shared<ParallelAggregatingBlockInputStream>(
pipeline.streams,
BlockInputStreams{},
params,
true,
max_streams,
settings.aggregation_memory_efficient_merge_threads ? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads) : static_cast<size_t>(settings.max_threads),
log->identifier());

pipeline.streams.resize(1);
pipeline.firstStream() = std::move(stream);

restoreConcurrency(pipeline, context.getDAGContext()->final_concurrency, log);
}
else
{
assert(pipeline.streams.size() == 1);
pipeline.firstStream() = std::make_shared<AggregatingBlockInputStream>(
pipeline.firstStream(),
params,
true,
log->identifier());
}

// we can record for agg after restore concurrency.
// Because the streams of expr_after_agg will provide the correct ProfileInfo.
Expand All @@ -163,8 +156,7 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont

void PhysicalAggregation::buildPipeline(PipelineBuilder & builder)
{
auto aggregate_context = std::make_shared<AggregateContext>(
log->identifier());
auto aggregate_context = std::make_shared<AggregateContext>(log->identifier());
// TODO support fine grained shuffle.
assert(!fine_grained_shuffle.enable());
auto agg_build = std::make_shared<PhysicalAggregationBuild>(
Expand Down
Loading