Skip to content

Commit

Permalink
split aggregation out block smaller than max block size (#6590)
Browse files Browse the repository at this point in the history
close #6579
  • Loading branch information
mengxin9014 authored Jan 13, 2023
1 parent e04179e commit 53862f2
Show file tree
Hide file tree
Showing 10 changed files with 445 additions and 32 deletions.
1 change: 1 addition & 0 deletions dbms/src/Core/Block.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class Block

using Blocks = std::vector<Block>;
using BlocksList = std::list<Block>;
using BucketBlocksListMap = std::map<Int32, BlocksList>;

Block mergeBlocks(Blocks && blocks);

Expand Down
68 changes: 48 additions & 20 deletions dbms/src/DataStreams/MergingAndConvertingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream
if (data.empty())
return {};

if (current_bucket_num >= NUM_BUCKETS)
return {};

FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_aggregate_merge_failpoint);

AggregatedDataVariantsPtr & first = data[0];
Expand All @@ -91,14 +88,21 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream
if (first->type == AggregatedDataVariants::Type::without_key)
{
aggregator.mergeWithoutKeyDataImpl(data);
return aggregator.prepareBlockAndFillWithoutKey(
single_level_blocks = aggregator.prepareBlocksAndFillWithoutKey(
*first,
final);
return popBlocksListFront(single_level_blocks);
}
}

if (!first->isTwoLevel())
{
Block out_block = popBlocksListFront(single_level_blocks);
if (likely(out_block))
{
return out_block;
}

if (current_bucket_num > 0)
return {};

Expand All @@ -120,51 +124,64 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
}
#undef M
return aggregator.prepareBlockAndFillSingleLevel(*first, final);
single_level_blocks = aggregator.prepareBlocksAndFillSingleLevel(*first, final);
return popBlocksListFront(single_level_blocks);
}
else
{
Block out_block = popBlocksListFront(two_level_blocks);
if (likely(out_block))
{
return out_block;
}
if (!parallel_merge_data)
{
parallel_merge_data = std::make_unique<ParallelMergeData>(threads);
for (size_t i = 0; i < threads; ++i)
scheduleThreadForNextBucket();
}

Block res;

while (true)
{
std::unique_lock lock(parallel_merge_data->mutex);

if (current_bucket_num >= NUM_BUCKETS)
{
return {};
}

if (parallel_merge_data->exception)
std::rethrow_exception(parallel_merge_data->exception);

auto it = parallel_merge_data->ready_blocks.find(current_bucket_num);
if (it != parallel_merge_data->ready_blocks.end())
{
++current_bucket_num;
scheduleThreadForNextBucket();
current_bucket_num++;

if (it->second)
if (!it->second.empty())
{
res.swap(it->second);
break;
two_level_blocks.splice(two_level_blocks.end(), std::move(it->second), it->second.begin(), it->second.end());

Block out = popBlocksListFront(two_level_blocks);
if (likely(out))
{
return out;
}
}
else if (current_bucket_num >= NUM_BUCKETS)
break;
}

continue;
}
parallel_merge_data->condvar.wait(lock);
}

return res;
}
}

private:
const LoggerPtr log;
const Aggregator & aggregator;
BlocksList single_level_blocks;
BlocksList two_level_blocks;
ManyAggregatedDataVariants data;
bool final;
size_t threads;
Expand All @@ -175,7 +192,7 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream

struct ParallelMergeData
{
std::map<Int32, Block> ready_blocks;
BucketBlocksListMap ready_blocks;
std::exception_ptr exception;
std::mutex mutex;
std::condition_variable condvar;
Expand Down Expand Up @@ -203,7 +220,7 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream
{
auto & merged_data = *data[0];
auto method = merged_data.type;
Block block;
BlocksList blocks;

/// Select Arena to avoid race conditions
size_t thread_number = static_cast<size_t>(bucket_num) % threads;
Expand All @@ -213,7 +230,7 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream
case AggregationMethodType(NAME): \
{ \
aggregator.mergeBucketImpl<AggregationMethodName(NAME)>(data, bucket_num, arena); \
block = aggregator.convertOneBucketToBlock( \
blocks = aggregator.convertOneBucketToBlocks( \
merged_data, \
*ToAggregationMethodPtr(NAME, merged_data.aggregation_method_impl), \
arena, \
Expand All @@ -230,7 +247,7 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream
#undef M

std::lock_guard lock(parallel_merge_data->mutex);
parallel_merge_data->ready_blocks[bucket_num] = std::move(block);
parallel_merge_data->ready_blocks[bucket_num] = std::move(blocks);
}
catch (...)
{
Expand All @@ -241,6 +258,17 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream

parallel_merge_data->condvar.notify_all();
}

static Block popBlocksListFront(BlocksList & blocks)
{
if (!blocks.empty())
{
Block out_block = blocks.front();
blocks.pop_front();
return out_block;
}
return {};
}
};

#undef AggregationMethodName
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@ Block ParallelAggregatingBlockInputStream::readImpl()
executed = true;
}

Block res;
if (isCancelledOrThrowIfKilled() || !impl)
return res;
return {};

return impl->read();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ Aggregator::Params buildParams(
settings.max_bytes_before_external_group_by,
!is_final_agg,
context.getTemporaryPath(),
context.getSettingsRef().max_block_size,
has_collator ? collators : TiDB::dummy_collators);
}

Expand Down
56 changes: 55 additions & 1 deletion dbms/src/Flash/tests/gtest_aggregation_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,20 @@ class AggExecutorTestRunner : public ExecutorTest
{{"key", TiDB::TP::TypeLong}, {"value", TiDB::TP::TypeString}},
{toNullableVec<Int32>("key", key), toNullableVec<String>("value", value)});
}
{
// with 1024 types of key.
std::vector<std::optional<TypeTraits<int>::FieldType>> key(1024);
std::vector<std::optional<String>> value(1024);
for (size_t i = 0; i < 1024; ++i)
{
key[i] = i;
value[i] = {fmt::format("val_{}", i)};
}
context.addMockTable(
{"test_db", "big_table_4"},
{{"key", TiDB::TP::TypeLong}, {"value", TiDB::TP::TypeString}},
{toNullableVec<Int32>("key", key), toNullableVec<String>("value", value)});
}
}

std::shared_ptr<tipb::DAGRequest> buildDAGRequest(std::pair<String, String> src, MockAstVec agg_funcs, MockAstVec group_by_exprs, MockColumnNameVec proj)
Expand Down Expand Up @@ -531,7 +545,7 @@ CATCH
TEST_F(AggExecutorTestRunner, AggMerge)
try
{
std::vector<String> tables{"big_table_1", "big_table_2", "big_table_3"};
std::vector<String> tables{"big_table_1", "big_table_2", "big_table_3", "big_table_4"};
for (const auto & table : tables)
{
auto request = context
Expand All @@ -552,5 +566,45 @@ try
}
CATCH

TEST_F(AggExecutorTestRunner, SplitAggOutput)
try
{
std::vector<String> tables{"big_table_1", "big_table_2", "big_table_3", "big_table_4"};
std::vector<size_t> max_block_sizes{1, 2, 9, 19, 40, DEFAULT_BLOCK_SIZE};
std::vector<size_t> concurrences{1, 2, 10};
std::vector<size_t> expect_rows{15, 200, 1, 1024};
for (size_t i = 0; i < tables.size(); ++i)
{
auto request = context
.scan("test_db", tables[i])
.aggregation({Max(col("value"))}, {col("key")})
.build(context);
context.context.setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(0)));
// 0: use one level
// 1: use two level
std::vector<UInt64> two_level_thresholds{0, 1};
for (auto two_level_threshold : two_level_thresholds)
{
for (auto block_size : max_block_sizes)
{
for (auto concurrency : concurrences)
{
context.context.setSetting("group_by_two_level_threshold", Field(static_cast<UInt64>(two_level_threshold)));
context.context.setSetting("max_block_size", Field(static_cast<UInt64>(block_size)));
auto blocks = getExecuteStreamsReturnBlocks(request, concurrency);
size_t actual_row = 0;
for (auto & block : blocks)
{
ASSERT(block.rows() <= block_size);
actual_row += block.rows();
}
ASSERT_EQ(actual_row, expect_rows[i]);
}
}
}
}
}
CATCH

} // namespace tests
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Flash/tests/gtest_join_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ try
ColumnsWithTypeAndName common_column_data;
size_t table_rows = 102400;
size_t common_rows = 20480;
size_t max_block_size = 800;
UInt64 max_block_size = 800;
size_t original_max_streams = 20;
for (const auto & column_info : mockColumnInfosToTiDBColumnInfos(left_column_infos))
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/tests/gtest_spill_sort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ try
DB::MockColumnInfoVec column_infos{{"a", TiDB::TP::TypeLongLong}, {"b", TiDB::TP::TypeLongLong}, {"c", TiDB::TP::TypeLongLong}, {"d", TiDB::TP::TypeLongLong}, {"e", TiDB::TP::TypeLongLong}};
ColumnsWithTypeAndName column_data;
size_t table_rows = 102400;
size_t max_block_size = 500;
UInt64 max_block_size = 500;
size_t original_max_streams = 20;
size_t total_data_size = 0;
size_t limit_size = table_rows / 10 * 9;
Expand Down
Loading

0 comments on commit 53862f2

Please sign in to comment.