Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refine agg/topn and add more unit tests #6549

Merged
merged 9 commits into from
Dec 28, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update
SeaRise committed Dec 26, 2022

Verified

This commit was signed with the committer’s verified signature.
commit 3a38b08e849f43134dd48ba322aeae1b15e75f50
114 changes: 8 additions & 106 deletions dbms/src/DataStreams/MergeSortingBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -13,17 +13,21 @@
// limitations under the License.

#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/MergeSortingBlocksBlockInputStream.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <IO/CompressedWriteBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <common/logger_useful.h>

namespace DB
{
namespace
{
/** Remove constant columns from block.
*/
static void removeConstantsFromBlock(Block & block)
void removeConstantsFromBlock(Block & block)
{
size_t columns = block.columns();
size_t i = 0;
@@ -39,7 +43,7 @@ static void removeConstantsFromBlock(Block & block)
}
}

static void removeConstantsFromSortDescription(const Block & header, SortDescription & description)
void removeConstantsFromSortDescription(const Block & header, SortDescription & description)
{
description.erase(
std::remove_if(description.begin(), description.end(), [&](const SortColumnDescription & elem) {
@@ -54,7 +58,7 @@ static void removeConstantsFromSortDescription(const Block & header, SortDescrip
/** Add into block, whose constant columns was removed by previous function,
* constant columns from header (which must have structure as before removal of constants from block).
*/
static void enrichBlockWithConstants(Block & block, const Block & header)
void enrichBlockWithConstants(Block & block, const Block & header)
{
size_t rows = block.rows();
size_t columns = header.columns();
@@ -66,7 +70,7 @@ static void enrichBlockWithConstants(Block & block, const Block & header)
block.insert(i, {col_type_name.column->cloneResized(rows), col_type_name.type, col_type_name.name});
}
}

} // namespace

MergeSortingBlockInputStream::MergeSortingBlockInputStream(
const BlockInputStreamPtr & input,
@@ -177,110 +181,8 @@ Block MergeSortingBlockInputStream::readImpl()
return res;
}


MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream(
Blocks & blocks_,
SortDescription & description_,
const String & req_id,
size_t max_merged_block_size_,
size_t limit_)
: blocks(blocks_)
, header(blocks.at(0).cloneEmpty())
, description(description_)
, max_merged_block_size(max_merged_block_size_)
, limit(limit_)
, log(Logger::get(req_id))
{
Blocks nonempty_blocks;
for (const auto & block : blocks)
{
if (block.rows() == 0)
continue;

nonempty_blocks.push_back(block);
cursors.emplace_back(block, description);
has_collation |= cursors.back().has_collation;
}

blocks.swap(nonempty_blocks);

if (!has_collation)
{
for (auto & cursor : cursors)
queue.push(SortCursor(&cursor));
}
else
{
for (auto & cursor : cursors)
queue_with_collation.push(SortCursorWithCollation(&cursor));
}
}


Block MergeSortingBlocksBlockInputStream::readImpl()
{
if (blocks.empty())
return Block();

if (blocks.size() == 1)
{
Block res = blocks[0];
blocks.clear();
return res;
}

return !has_collation
? mergeImpl<SortCursor>(queue)
: mergeImpl<SortCursorWithCollation>(queue_with_collation);
}


template <typename TSortCursor>
Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue<TSortCursor> & queue)
{
size_t num_columns = blocks[0].columns();

MutableColumns merged_columns = blocks[0].cloneEmptyColumns();
/// TODO: reserve (in each column)

/// Take rows from queue in right order and push to 'merged'.
size_t merged_rows = 0;
while (!queue.empty())
{
TSortCursor current = queue.top();
queue.pop();

for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);

if (!current->isLast())
{
current->next();
queue.push(current);
}

++total_merged_rows;
if (limit && total_merged_rows == limit)
{
auto res = blocks[0].cloneWithColumns(std::move(merged_columns));
blocks.clear();
return res;
}

++merged_rows;
if (merged_rows == max_merged_block_size)
return blocks[0].cloneWithColumns(std::move(merged_columns));
}

if (merged_rows == 0)
return {};

return blocks[0].cloneWithColumns(std::move(merged_columns));
}

void MergeSortingBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
buffer.fmtAppend(", limit = {}", limit);
}

} // namespace DB
60 changes: 1 addition & 59 deletions dbms/src/DataStreams/MergeSortingBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -14,76 +14,18 @@

#pragma once

#include <Common/Logger.h>
#include <Core/SortCursor.h>
#include <Core/SortDescription.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <IO/CompressedReadBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <Poco/TemporaryFile.h>
#include <common/logger_useful.h>

#include <queue>


namespace DB
{
/** Merges stream of sorted each-separately blocks to sorted as-a-whole stream of blocks.
* If data to sort is too much, could use external sorting, with temporary files.
*/

/** Part of implementation. Merging array of ready (already read from somewhere) blocks.
* Returns result of merge as stream of blocks, not more than 'max_merged_block_size' rows in each.
*/
class MergeSortingBlocksBlockInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "MergeSortingBlocks";

public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
MergeSortingBlocksBlockInputStream(
Blocks & blocks_,
SortDescription & description_,
const String & req_id,
size_t max_merged_block_size_,
size_t limit_ = 0);

String getName() const override { return NAME; }

bool isGroupedOutput() const override { return true; }
bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; }

Block getHeader() const override { return header; }

protected:
Block readImpl() override;

private:
Blocks & blocks;
Block header;
SortDescription description;
size_t max_merged_block_size;
size_t limit;
size_t total_merged_rows = 0;

using CursorImpls = std::vector<SortCursorImpl>;
CursorImpls cursors;

bool has_collation = false;

std::priority_queue<SortCursor> queue;
std::priority_queue<SortCursorWithCollation> queue_with_collation;

/** Two different cursors are supported - with and without Collation.
* Templates are used (instead of virtual functions in SortCursor) for zero-overhead.
*/
template <typename TSortCursor>
Block mergeImpl(std::priority_queue<TSortCursor> & queue);

LoggerPtr log;
};

class MergeSortingBlockInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "MergeSorting";
118 changes: 118 additions & 0 deletions dbms/src/DataStreams/MergeSortingBlocksBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/MergeSortingBlocksBlockInputStream.h>

namespace DB
{
MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream(
Blocks & blocks_,
SortDescription & description_,
const String & req_id,
size_t max_merged_block_size_,
size_t limit_)
: blocks(blocks_)
, header(blocks.at(0).cloneEmpty())
, description(description_)
, max_merged_block_size(max_merged_block_size_)
, limit(limit_)
, log(Logger::get(req_id))
{
Blocks nonempty_blocks;
for (const auto & block : blocks)
{
if (block.rows() == 0)
continue;

nonempty_blocks.push_back(block);
cursors.emplace_back(block, description);
has_collation |= cursors.back().has_collation;
}

blocks.swap(nonempty_blocks);

if (!has_collation)
{
for (auto & cursor : cursors)
queue.push(SortCursor(&cursor));
}
else
{
for (auto & cursor : cursors)
queue_with_collation.push(SortCursorWithCollation(&cursor));
}
}


Block MergeSortingBlocksBlockInputStream::readImpl()
{
if (blocks.empty())
return Block();

if (blocks.size() == 1)
{
Block res = blocks[0];
blocks.clear();
return res;
}

return !has_collation
? mergeImpl<SortCursor>(queue)
: mergeImpl<SortCursorWithCollation>(queue_with_collation);
}


template <typename TSortCursor>
Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue<TSortCursor> & queue)
{
size_t num_columns = blocks[0].columns();

MutableColumns merged_columns = blocks[0].cloneEmptyColumns();
/// TODO: reserve (in each column)

/// Take rows from queue in right order and push to 'merged'.
size_t merged_rows = 0;
while (!queue.empty())
{
TSortCursor current = queue.top();
queue.pop();

for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);

if (!current->isLast())
{
current->next();
queue.push(current);
}

++total_merged_rows;
if (limit && total_merged_rows == limit)
{
auto res = blocks[0].cloneWithColumns(std::move(merged_columns));
blocks.clear();
return res;
}

++merged_rows;
if (merged_rows == max_merged_block_size)
return blocks[0].cloneWithColumns(std::move(merged_columns));
}

if (merged_rows == 0)
return {};

return blocks[0].cloneWithColumns(std::move(merged_columns));
}
} // namespace DB
81 changes: 81 additions & 0 deletions dbms/src/DataStreams/MergeSortingBlocksBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Core/SortCursor.h>
#include <Core/SortDescription.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Common/Logger.h>

#include <queue>

namespace DB
{
/** Merges stream of sorted each-separately blocks to sorted as-a-whole stream of blocks.
* If data to sort is too much, could use external sorting, with temporary files.
*/

/** Part of implementation. Merging array of ready (already read from somewhere) blocks.
* Returns result of merge as stream of blocks, not more than 'max_merged_block_size' rows in each.
*/
class MergeSortingBlocksBlockInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "MergeSortingBlocks";

public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
MergeSortingBlocksBlockInputStream(
Blocks & blocks_,
SortDescription & description_,
const String & req_id,
size_t max_merged_block_size_,
size_t limit_ = 0);

String getName() const override { return NAME; }

bool isGroupedOutput() const override { return true; }
bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; }

Block getHeader() const override { return header; }

protected:
Block readImpl() override;

private:
Blocks & blocks;
Block header;
SortDescription description;
size_t max_merged_block_size;
size_t limit;
size_t total_merged_rows = 0;

using CursorImpls = std::vector<SortCursorImpl>;
CursorImpls cursors;

bool has_collation = false;

std::priority_queue<SortCursor> queue;
std::priority_queue<SortCursorWithCollation> queue_with_collation;

/** Two different cursors are supported - with and without Collation.
* Templates are used (instead of virtual functions in SortCursor) for zero-overhead.
*/
template <typename TSortCursor>
Block mergeImpl(std::priority_queue<TSortCursor> & queue);

LoggerPtr log;
};
} // namespace DB
53 changes: 52 additions & 1 deletion dbms/src/Flash/tests/gtest_topn_executor.cpp
Original file line number Diff line number Diff line change
@@ -19,7 +19,6 @@ namespace DB
{
namespace tests
{

class ExecutorTopNTestRunner : public DB::tests::ExecutorTest
{
public:
@@ -45,6 +44,38 @@ class ExecutorTopNTestRunner : public DB::tests::ExecutorTest
toNullableVec<String>(col_name[1], col_gender),
toNullableVec<String>(col_name[2], col_country),
toNullableVec<Int32>(col_name[3], col_salary)});

/// table with 200 rows
{
// with 15 types of key.
std::vector<std::optional<TypeTraits<int>::FieldType>> key(200);
for (size_t i = 0; i < 200; ++i)
key[i] = i % 15;
context.addMockTable(
{"test_db", "big_table_1"},
{{"key", TiDB::TP::TypeLong}},
{toNullableVec<Int32>("key", key)});
}
{
// with 200 types of key.
std::vector<std::optional<TypeTraits<int>::FieldType>> key(200);
for (size_t i = 0; i < 200; ++i)
key[i] = i;
context.addMockTable(
{"test_db", "big_table_2"},
{{"key", TiDB::TP::TypeLong}},
{toNullableVec<Int32>("key", key)});
}
{
// with 1 types of key.
std::vector<std::optional<TypeTraits<int>::FieldType>> key(200);
for (size_t i = 0; i < 200; ++i)
key[i] = 0;
context.addMockTable(
{"test_db", "big_table_3"},
{{"key", TiDB::TP::TypeLong}},
{toNullableVec<Int32>("key", key)});
}
}

std::shared_ptr<tipb::DAGRequest> buildDAGRequest(const String & table_name, const String & col_name, bool is_desc, int limit_num)
@@ -214,5 +245,25 @@ try
}
CATCH

TEST_F(ExecutorTopNTestRunner, BigTable)
try
{
std::vector<String> tables{"big_table_1", "big_table_2", "big_table_3"};
for (const auto & table : tables)
{
std::vector<size_t> limits{0, 1, 100, 200, 300};
for (auto limit_num : limits)
{
auto request = context
.scan("test_db", table)
.topN("key", false, limit_num)
.build(context);
auto expect = executeStreams(request, 1);
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
executeAndAssertColumnsEqual(request, expect);
}
}
}
CATCH

} // namespace tests
} // namespace DB