Skip to content

Commit

Permalink
add ut
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <[email protected]>
  • Loading branch information
Lloyd-Pottiger committed Mar 21, 2024
1 parent 3606c08 commit b3a0211
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 59 deletions.
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ void BitmapFilter::set(const UInt32 * data, UInt32 size, const FilterPtr & f)
}
}

void BitmapFilter::set(UInt32 start, UInt32 limit)
void BitmapFilter::set(UInt32 start, UInt32 limit, bool value)
{
RUNTIME_CHECK(start + limit <= filter.size(), start, limit, filter.size());
std::fill(filter.begin() + start, filter.begin() + start + limit, true);
std::fill(filter.begin() + start, filter.begin() + start + limit, value);
}

bool BitmapFilter::get(IColumn::Filter & f, UInt32 start, UInt32 limit) const
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class BitmapFilter
void set(BlockInputStreamPtr & stream);
void set(const ColumnPtr & col, const FilterPtr & f);
void set(const UInt32 * data, UInt32 size, const FilterPtr & f);
void set(UInt32 start, UInt32 limit);
void set(UInt32 start, UInt32 limit, bool value = true);
// If return true, all data is match and do not fill the filter.
bool get(IColumn::Filter & f, UInt32 start, UInt32 limit) const;
// filter[start, limit] & f -> f
Expand Down
27 changes: 19 additions & 8 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter)

MutableColumns columns;
columns.reserve(read_columns.size());
size_t total_passed_count = countBytesInFilter(filter, 0, filter.size());
size_t total_passed_count = countBytesInFilter(filter);
for (const auto & cd : read_columns)
{
auto col = cd.type->createColumn();
Expand All @@ -222,17 +222,18 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter)
}

size_t offset = 0;
// reset next_pack_id to start_pack_id
// reset next_pack_id to start_pack_id, next_row_offset to start_row_offset
next_pack_id = start_pack_id;
for (size_t i = start_pack_id; i < last_pack_id; ++i)
next_row_offset = start_row_offset;
for (size_t pack_id = start_pack_id; pack_id < last_pack_id; ++pack_id)
{
// When the next pack is not used or the pack is the last pack, call read() to read theses packs and filter them
// For example:
// When next_pack_id_cp = use_packs.size() and use_packs[next_pack_id:next_pack_id_cp] = [true, true, false, true, true, true]
// The algorithm runs as follows:
// When i = next_pack_id + 2, call read() to read {next_pack_id, next_pack_id + 1}th packs
// When i = next_pack_id + 5, call read() to read {next_pack_id + 3, next_pack_id + 4, next_pack_id + 5}th packs
if (use_packs[i] && (i + 1 == use_packs.size() || !use_packs[i + 1]))
if (use_packs[pack_id] && (pack_id + 1 == use_packs.size() || !use_packs[pack_id + 1]))
{
Block block = read();
size_t rows = block.rows();
Expand All @@ -251,10 +252,21 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter)
columns[i]->insertDisjunctFrom(*block.getByPosition(i).column, positions);
}
}
else
{
for (size_t i = 0; i < block.columns(); ++i)
{
columns[i]->insertRangeFrom(
*block.getByPosition(i).column,
0,
block.getByPosition(i).column->size());
}
}
offset += rows;
}
else if (!use_packs[i])
else if (!use_packs[pack_id])
{
offset += pack_stats[i].rows;
offset += pack_stats[pack_id].rows;
}
}

Expand All @@ -263,9 +275,8 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter)
if (last_pack_id < use_packs.size())
use_packs[last_pack_id] = next_pack_id_use_packs_cp;

Block res;
Block res = getHeader().cloneWithColumns(std::move(columns));
res.setStartOffset(start_row_offset);
res.setColumns(std::move(columns));
return res;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Columns/ColumnsCommon.h>
#include <Common/Logger.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>
#include <Storages/DeltaMerge/LateMaterializationBlockInputStream.h>
#include <Storages/DeltaMerge/RowKeyOrderedBlockInputStream.h>
#include <Storages/DeltaMerge/tests/gtest_segment_test_basic.h>
#include <Storages/DeltaMerge/tests/gtest_segment_util.h>
Expand All @@ -27,6 +28,62 @@
namespace DB::DM::tests
{

class MockFilterBlockInputStream : public IProfilingBlockInputStream
{
public:
explicit MockFilterBlockInputStream(const BlockInputStreamPtr & input_)
: input(input_)
, e(time(nullptr))
{}

String getName() const override { return "MockFilter"; }

Block getHeader() const override { return input->getHeader(); }

Block readImpl() override
{
FilterPtr filter_ignored;
return readImpl(filter_ignored, false);
}

Block readImpl(FilterPtr & res_filter, bool return_filter) override
{
assert(return_filter);
auto blk = input->read();
if (!blk)
return {};

filter.resize(blk.rows());
res_filter = &filter;
size_t mode = e() % 3;
if (mode == 0)
{
std::fill(filter.begin(), filter.end(), 0);
}
else if (mode == 1)
{
std::fill(filter.begin(), filter.end(), 1);
}
else
{
std::transform(filter.begin(), filter.end(), filter.begin(), [&e = e](auto) {
return e() % 8192 == 0 ? 1 : 0;
});
filter[e() % blk.rows()] = 1; // should not be all 0.
}
total_filter.insert(total_filter.end(), filter.begin(), filter.end());
return blk;
}

public:
IColumn::Filter filter{};
IColumn::Filter total_filter{};

private:
BlockInputStreamPtr input;
std::default_random_engine e;
};

class SkippableBlockInputStreamTest : public SegmentTestBasic
{
protected:
Expand Down Expand Up @@ -82,8 +139,10 @@ class SkippableBlockInputStreamTest : public SegmentTestBasic
}

auto [segment, snapshot] = getSegmentForRead(SEG_ID);
ColumnDefines columns_to_read
= {getExtraHandleColumnDefine(options.is_common_handle), getVersionColumnDefine()};
ColumnDefines columns_to_read = {
getExtraHandleColumnDefine(options.is_common_handle),
getVersionColumnDefine(),
};

auto stream = getInputStream(segment, snapshot, columns_to_read, read_ranges);

Expand Down Expand Up @@ -126,8 +185,10 @@ class SkippableBlockInputStreamTest : public SegmentTestBasic
}

auto [segment, snapshot] = getSegmentForRead(SEG_ID);
ColumnDefines columns_to_read
= {getExtraHandleColumnDefine(options.is_common_handle), getVersionColumnDefine()};
ColumnDefines columns_to_read = {
getExtraHandleColumnDefine(options.is_common_handle),
getVersionColumnDefine(),
};

auto stream1 = getInputStream(segment, snapshot, columns_to_read, read_ranges);
auto stream2 = getInputStream(segment, snapshot, columns_to_read, read_ranges);
Expand Down Expand Up @@ -166,60 +227,64 @@ class SkippableBlockInputStreamTest : public SegmentTestBasic
}

auto [segment, snapshot] = getSegmentForRead(SEG_ID);
ColumnDefines columns_to_read
= {getExtraHandleColumnDefine(options.is_common_handle), getVersionColumnDefine()};
ColumnDefines columns_to_read = {
getExtraHandleColumnDefine(options.is_common_handle),
getVersionColumnDefine(),
};

auto stream = getInputStream(segment, snapshot, columns_to_read, read_ranges);
BlockInputStreamPtr stream = getInputStream(segment, snapshot, columns_to_read, read_ranges);
BlockInputStreamPtr filter_cloumn_stream = std::make_shared<MockFilterBlockInputStream>(stream);
auto rest_column_stream = getInputStream(segment, snapshot, columns_to_read, read_ranges);

stream->readPrefix();
std::vector<Block> expected_blks;
for (auto blk = stream->read(); blk; blk = stream->read())
size_t total_rows = snapshot->stable->getRows() + snapshot->delta->getRows();
auto bitmap_filter = std::make_shared<BitmapFilter>(total_rows, 1);
std::default_random_engine e(time(nullptr));
for (size_t i = 0; i < 10; ++i)
{
expected_blks.push_back(std::move(blk));
size_t start = e() % total_rows;
size_t limit = e() % (total_rows - start);
bitmap_filter->set(start, limit, false);
}
stream->readSuffix();

stream = getInputStream(segment, snapshot, columns_to_read, read_ranges);

std::default_random_engine e(time(nullptr));

size_t offset = 0;
stream->readPrefix();
for (auto & eblk : expected_blks)
auto late_materialization_stream = std::make_shared<LateMaterializationBlockInputStream>(
columns_to_read,
"",
filter_cloumn_stream,
rest_column_stream,
bitmap_filter,
"test");
late_materialization_stream->readPrefix();
auto normal_stream = getInputStream(segment, snapshot, columns_to_read, read_ranges);
normal_stream->readPrefix();
auto filter_stream = std::dynamic_pointer_cast<MockFilterBlockInputStream>(filter_cloumn_stream);
while (true)
{
if (e() % 3 == 0)
{
offset += eblk.rows();
size_t skipped_rows = stream->skipNextBlock();
ASSERT_EQ(skipped_rows, eblk.rows());
}
else if (e() % 3 == 1)
auto blk1 = late_materialization_stream->read();
if (!blk1)
break;
Block blk2;
while (!blk2)
{
IColumn::Filter filter(eblk.rows(), 1);
std::transform(filter.begin(), filter.end(), filter.begin(), [&e](auto) {
return e() % 8192 == 0 ? 1 : 0;
});
filter[e() % eblk.rows()] = 1; // should not be all 0.
auto blk = stream->readWithFilter(filter);
ASSERT_EQ(offset, blk.startOffset());
ASSERT_EQ(blk.rows(), countBytesInFilter(filter));
offset += eblk.rows();
for (auto & col : eblk)
blk2 = normal_stream->read();
auto & filter = filter_stream->total_filter;
IColumn::Filter block_filter(
filter.cbegin() + blk2.startOffset(),
filter.cbegin() + blk2.startOffset() + blk2.rows());
bitmap_filter->rangeAnd(block_filter, blk2.startOffset(), blk2.rows());
size_t passed_count = countBytesInFilter(block_filter);
if (passed_count == 0)
{
col.column = col.column->filter(filter, -1);
blk2 = {};
continue;
}
for (auto & col : blk2)
{
col.column = col.column->filter(block_filter, passed_count);
}
ASSERT_BLOCK_EQ(eblk, blk);
}
else
{
auto blk = stream->read();
ASSERT_EQ(offset, blk.startOffset());
offset += blk.rows();
ASSERT_BLOCK_EQ(eblk, blk);
}
ASSERT_BLOCK_EQ(blk1, blk2);
}
ASSERT_BLOCK_EQ(stream->read(), Block{});
stream->readSuffix();
late_materialization_stream->readSuffix();
normal_stream->readSuffix();
}

void writeSegment(const SegDataUnit & unit)
Expand Down

0 comments on commit b3a0211

Please sign in to comment.