Skip to content

Commit

Permalink
Fix test utils. (#6348)
Browse files Browse the repository at this point in the history
ref #6296
  • Loading branch information
JinheLin authored Nov 24, 2022
1 parent e7be8e7 commit b843a9f
Showing 1 changed file with 54 additions and 35 deletions.
89 changes: 54 additions & 35 deletions dbms/src/TestUtils/InputStreamTestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
#include <Core/ColumnWithTypeAndName.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
#include <TestUtils/FunctionTestUtils.h>
#include <TestUtils/InputStreamTestUtils.h>
#include <fmt/format.h>

namespace DB
{
Expand Down Expand Up @@ -386,12 +388,14 @@ ::testing::AssertionResult UnorderedInputStreamVSBlockUnrestrictlyCompareColumns
Block expect_block(columns);
expect_block.checkNumberOfRows(); // check the input

// Blocks can be unordered when read-thread-pool enabled.
// So read all blocks and sort them by handle column or column at position 0.
// Blocks can be unordered when read-thread-pool or fast-scan or bitmap-filter(currently not supported) is enabled.
// Especially, when fast-scan or bitmap-filter is enabled, it's not just disorder between blocks, but also rows inside the block are unordered.
// So read all blocks and sort them by handle before the comparison.
size_t num_rows_expect = expect_block.rows();
size_t num_rows_read = 0;
std::vector<Block> blocks;
stream->readPrefix();
// Read all blocks.
while (Block read_block = stream->read())
{
num_rows_read += read_block.rows();
Expand All @@ -413,52 +417,67 @@ ::testing::AssertionResult UnorderedInputStreamVSBlockUnrestrictlyCompareColumns
blocks.emplace_back(std::move(read_block));
}
stream->readSuffix();
Block blk;
// Sort rows by handle.
if (!blocks.empty())
{
blk = blocks.front().cloneEmpty();
// First, merge all blocks into one.
auto mut_cols = blk.cloneEmptyColumns();
for (const auto & b : blocks)
{
for (size_t i = 0; i < b.columns(); i++)
{
const auto & col = *b.getByPosition(i).column;
mut_cols[i]->insertRangeFrom(col, 0, col.size());
}
}

auto cmp_blk = [](const Block & a, const Block & b) {
const auto & col_a = a.has(EXTRA_HANDLE_COLUMN_NAME) ? a.getByName(EXTRA_HANDLE_COLUMN_NAME) : a.getByPosition(0);
const auto & col_b = b.has(EXTRA_HANDLE_COLUMN_NAME) ? b.getByName(EXTRA_HANDLE_COLUMN_NAME) : b.getByPosition(0);
if (col_a.column->empty() || col_b.column->empty())
// Sort all columns by handle. Assume position 0 is hanle column in these tests.
auto & handle_col = mut_cols[0];
std::vector<size_t> ids;
for (size_t i = 0; i < handle_col->size(); i++)
{
return false;
ids.push_back(i);
}
const auto & field_a = (*col_a.column)[0];
const auto & field_b = (*col_b.column)[0];
return field_a < field_b;
};
std::sort(blocks.begin(), blocks.end(), cmp_blk);

size_t start_offset = 0;
for (const auto & read_block : blocks)
{
for (size_t col_idx = 0; col_idx < colnames.size(); ++col_idx)
std::sort(ids.begin(), ids.end(), [&](size_t a, size_t b) {
return handle_col->getDataAt(a) < handle_col->getDataAt(b);
});
auto sorted_cols = blk.cloneEmptyColumns();
for (size_t pos = 0; pos < sorted_cols.size(); pos++)
{
const auto & col_name = colnames[col_idx];
// Copy the [start_offset, read_block.rows()) of `expect_block`
const auto & expect_full_col = expect_block.getByPosition(col_idx);
auto expect_col = expect_full_col.cloneEmpty();
auto column_data = expect_col.type->createColumn();
column_data->insertRangeFrom(*expect_full_col.column, start_offset, read_block.rows());
expect_col.column = std::move(column_data);

const auto & actual_col = read_block.getByName(col_name);
if (auto res = columnEqual(expect_col, actual_col); !res)
auto & sorted_col = sorted_cols[pos];
ColumnPtr col = std::move(mut_cols[pos]);
for (auto id : ids)
{
auto expect_expr = fmt::format("expect block: {}", getColumnsContent(expect_block.getColumnsWithTypeAndName(), start_offset, start_offset + read_block.rows()));
Block actual_block_to_cmp;
for (const auto & col_name : colnames)
actual_block_to_cmp.insert(read_block.getByName(col_name));
auto actual_expr = fmt::format("actual block: {}", getColumnsContent(actual_block_to_cmp.getColumnsWithTypeAndName()));
return res << fmt::format("\n details: [column={}] [prev_nrows={}] [cur_nrows={}]:\n {}\n {}", col_name, start_offset, start_offset + read_block.rows(), expect_expr, actual_expr);
sorted_col->insertRangeFrom(*col, id, 1);
}
}
blk.setColumns(std::move(sorted_cols));
}

start_offset += read_block.rows();
for (size_t col_idx = 0; col_idx < colnames.size(); ++col_idx)
{
const auto & col_name = colnames[col_idx];
const auto & expect_full_col = expect_block.getByPosition(col_idx);
const auto & actual_col = blk.getByName(col_name);
if (auto res = columnEqual(expect_full_col, actual_col); !res)
{
auto expect_expr = fmt::format("expect block: {}", getColumnsContent(expect_block.getColumnsWithTypeAndName(), 0, blk.rows()));
Block actual_block_to_cmp;
for (const auto & name : colnames)
{
actual_block_to_cmp.insert(blk.getByName(name));
}
auto actual_expr = fmt::format("actual block: {}", getColumnsContent(actual_block_to_cmp.getColumnsWithTypeAndName()));
return res << fmt::format("\n details: [column={}] [prev_nrows={}] [cur_nrows={}]:\n {}\n {}", col_name, 0, blk.rows(), expect_expr, actual_expr);
}
}

if (num_rows_expect == num_rows_read)
return ::testing::AssertionSuccess();

// Less rows than expected
// Fewer rows than expected
auto reason = fmt::format(R"r( ({}).read() return num of rows
Which is: {}
the num rows of ({})
Expand Down

0 comments on commit b843a9f

Please sign in to comment.