Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: Skip filter if all data match in Min-Max index #9318

Merged
merged 14 commits into from
Aug 22, 2024
11 changes: 9 additions & 2 deletions dbms/src/Core/Block.cpp
Original file line number Diff line number Diff line change
@@ -542,6 +542,7 @@ Block hstackBlocks(Blocks && blocks, const Block & header)

Block res = header.cloneEmpty();
size_t num_rows = blocks.front().rows();
auto rs_result = DM::RSResult::All;
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
for (const auto & block : blocks)
{
RUNTIME_CHECK_MSG(block.rows() == num_rows, "Cannot hstack blocks with different number of rows");
@@ -552,7 +553,9 @@ Block hstackBlocks(Blocks && blocks, const Block & header)
res.getByName(elem.name).column = std::move(elem.column);
}
}
rs_result = rs_result && block.getRSResult();
}
res.setRSResult(rs_result);

return res;
}
@@ -579,7 +582,7 @@ Block vstackBlocks(Blocks && blocks)

auto & first_block = blocks.front();
MutableColumns dst_columns(first_block.columns());

auto rs_result = first_block.getRSResult();
for (size_t i = 0; i < first_block.columns(); ++i)
{
dst_columns[i] = (*std::move(first_block.getByPosition(i).column)).mutate();
@@ -611,6 +614,7 @@ Block vstackBlocks(Blocks && blocks)
{
dst_columns[idx]->insertRangeFrom(*blocks[i].getByPosition(idx).column, 0, blocks[i].rows());
}
rs_result = rs_result && blocks[i].getRSResult();
}
}

@@ -623,7 +627,9 @@ Block vstackBlocks(Blocks && blocks)
total_allocated_bytes == updated_total_allocated_bytes,
"vstackBlock's reserve does not reserve enough bytes");
}
return first_block.cloneWithColumns(std::move(dst_columns));
auto res = first_block.cloneWithColumns(std::move(dst_columns));
res.setRSResult(rs_result);
return res;
}

Block popBlocksListFront(BlocksList & blocks)
@@ -727,6 +733,7 @@ void Block::swap(Block & other) noexcept
std::swap(info, other.info);
data.swap(other.data);
index_by_name.swap(other.index_by_name);
std::swap(rs_result, other.rs_result);
}


7 changes: 6 additions & 1 deletion dbms/src/Core/Block.h
Original file line number Diff line number Diff line change
@@ -18,13 +18,13 @@
#include <Core/ColumnWithTypeAndName.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Core/NamesAndTypes.h>
#include <Storages/DeltaMerge/Index/RSResult.h>

#include <initializer_list>
#include <list>
#include <map>
#include <vector>


namespace DB
{
/** Container for set of columns for bunch of rows in memory.
@@ -52,6 +52,8 @@ class Block
// Only used for calculating MVCC-bitmap-filter.
ColumnPtr segment_row_id_col;

DM::RSResult rs_result = DM::RSResult::Some;

public:
BlockInfo info;

@@ -163,6 +165,9 @@ class Block
void setSegmentRowIdCol(ColumnPtr && col) { segment_row_id_col = col; }
ColumnPtr segmentRowIdCol() const { return segment_row_id_col; }

void setRSResult(DM::RSResult r) { rs_result = r; }
DM::RSResult getRSResult() const { return rs_result; }

private:
void eraseImpl(size_t position);
void initializeIndexByName();
70 changes: 70 additions & 0 deletions dbms/src/Core/tests/gtest_block.cpp
Original file line number Diff line number Diff line change
@@ -19,10 +19,16 @@
#include <DataStreams/materializeBlock.h>
#include <Functions/FunctionHelpers.h>
#include <IO/Encryption/MockKeyManager.h>
#include <Storages/DeltaMerge/Index/RSResult.h>
#include <Storages/DeltaMerge/tests/DMTestEnv.h>
#include <TestUtils/ColumnGenerator.h>
#include <TestUtils/FunctionTestUtils.h>
#include <TestUtils/TiFlashTestBasic.h>

#include <numeric>

using namespace DB::DM;
using namespace DB::DM::tests;

namespace DB
{
@@ -166,5 +172,69 @@ try
}
CATCH

template <typename F>
void permutationRSResults(F && check)
{
const RSResults candidate_rs_results{RSResult::All, RSResult::AllNull, RSResult::Some, RSResult::SomeNull};
for (auto a : candidate_rs_results)
for (auto b : candidate_rs_results)
for (auto c : candidate_rs_results)
for (auto d : candidate_rs_results)
check({a, b, c, d});
}

RSResult logicalAnd(const RSResults & rs_results)
{
return std::accumulate(rs_results.cbegin(), rs_results.cend(), RSResult::All, std::logical_and<>{});
}

TEST_F(BlockTest, VstackBlocksRSResult)
try
{
auto check_vstack_blocks = [](const RSResults & rs_results) {
Blocks blocks;
size_t start = 0;
constexpr size_t num_rows = 10;
for (auto rs_result : rs_results)
{
blocks.push_back(DMTestEnv::prepareSimpleWriteBlock(start, start + num_rows, false));
blocks.back().setRSResult(rs_result);
start += num_rows;
}
auto b = vstackBlocks(std::move(blocks));
ASSERT_EQ(b.getRSResult(), logicalAnd(rs_results));
};
permutationRSResults(std::move(check_vstack_blocks));
}
CATCH

TEST_F(BlockTest, HstackBlocksRSResult)
try
{
auto create_column_defines = [](int count) {
ColumnDefines column_defines;
for (int i = 0; i < count; ++i)
column_defines.emplace_back(i + 1, std::to_string(i), std::make_shared<DataTypeInt64>());
return column_defines;
};
auto check_hstack_blocks = [&](const RSResults & rs_results) {
auto column_defines = create_column_defines(rs_results.size());
auto header = toEmptyBlock(column_defines);
Blocks blocks;
constexpr size_t num_rows = 10;
for (size_t i = 0; i < rs_results.size(); ++i)
{
const auto & cd = column_defines[i];
Block block;
block.insert(createColumn<Int64>(createSignedNumbers(0, num_rows), cd.name, cd.id));
block.setRSResult(rs_results[i]);
blocks.push_back(std::move(block));
}
auto b = hstackBlocks(std::move(blocks), header);
ASSERT_EQ(b.getRSResult(), logicalAnd(rs_results));
};
permutationRSResults(std::move(check_hstack_blocks));
}
CATCH
} // namespace tests
} // namespace DB
10 changes: 10 additions & 0 deletions dbms/src/DataStreams/FilterTransformAction.cpp
Original file line number Diff line number Diff line change
@@ -74,6 +74,16 @@ bool FilterTransformAction::transform(Block & block, FilterPtr & res_filter, boo
if (unlikely(!block))
return true;

if (block.getRSResult().allMatch())
{
block.insert(
filter_column,
header.safeGetByPosition(filter_column)); // Make some checks on block structure happy.
if (return_filter)
res_filter = nullptr;
return true;
}

expression->execute(block);

if (constant_filter_description.always_true)
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/ExpressionActions.cpp
Original file line number Diff line number Diff line change
@@ -402,7 +402,7 @@ void ExpressionAction::execute(Block & block) const
column.name = alias;
new_block.insert(std::move(column));
}

new_block.setRSResult(block.getRSResult());
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved
block.swap(new_block);

break;
9 changes: 9 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp
Original file line number Diff line number Diff line change
@@ -82,6 +82,15 @@ void DMFilePackFilter::init(ReadTag read_tag)
pack_res.begin(),
[](RSResult a, RSResult b) { return a && b; });
}
else
{
// ColumnFileBig in DeltaValueSpace never pass a filter to DMFilePackFilter.
// Assume its filter always return Some.
std::transform(pack_res.cbegin(), pack_res.cend(), pack_res.begin(), [](RSResult a) {
return a && RSResult::Some;
});
}

auto [none_count, some_count, all_count, all_null_count] = countPackRes();
auto after_filter = some_count + all_count + all_null_count;
ProfileEvents::increment(ProfileEvents::DMFileFilterAftRoughSet, after_filter);
76 changes: 68 additions & 8 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
@@ -103,6 +103,7 @@ DMFileReader::DMFileReader(
{
col_data_cache = std::make_unique<ColumnSharingCacheMap>(path(), read_columns, log);
}
initAllMatchBlockInfo();
}

bool DMFileReader::getSkippedRows(size_t & skip_rows)
@@ -131,7 +132,7 @@ size_t DMFileReader::skipNextBlock()
}

// move forward next_pack_id and next_row_offset
const size_t read_rows = getReadRows();
const auto [read_rows, rs_result] = getReadRows();
if (read_rows == 0)
return 0;

@@ -142,25 +143,27 @@ size_t DMFileReader::skipNextBlock()

// Get the number of rows to read in the next block
// Move forward next_pack_id and next_row_offset
size_t DMFileReader::getReadRows()
std::pair<size_t, RSResult> DMFileReader::getReadRows()
{
const auto & pack_res = pack_filter.getPackResConst();
const size_t start_pack_id = next_pack_id;
// When read_one_pack_every_time is true, we can just read one pack every time.
// std::numeric_limits<size_t>::max() means no limit
const size_t read_pack_limit = read_one_pack_every_time ? 1 : std::numeric_limits<size_t>::max();
const size_t read_pack_limit = getReadPackLimit(start_pack_id);
const auto & pack_stats = dmfile->getPackStats();
size_t read_rows = 0;
auto last_pack_res = RSResult::All;
for (; next_pack_id < pack_res.size() && pack_res[next_pack_id].isUse() && read_rows < rows_threshold_per_read;
++next_pack_id)
{
if (next_pack_id - start_pack_id >= read_pack_limit)
break;
last_pack_res = last_pack_res && pack_res[next_pack_id];
read_rows += pack_stats[next_pack_id].rows;
}

next_row_offset += read_rows;
return read_rows;
if (read_tag == ReadTag::Query && last_pack_res.allMatch())
scan_context->rs_dmfile_read_with_all += next_pack_id - start_pack_id;
return {read_rows, last_pack_res};
}

Block DMFileReader::readWithFilter(const IColumn::Filter & filter)
@@ -179,7 +182,7 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter)

size_t start_row_offset = next_row_offset;
size_t start_pack_id = next_pack_id;
const size_t read_rows = getReadRows();
const auto [read_rows, rs_result] = getReadRows();
RUNTIME_CHECK(read_rows == filter.size(), read_rows, filter.size());
size_t last_pack_id = next_pack_id;
{
@@ -269,6 +272,7 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter)

Block res = getHeader().cloneWithColumns(std::move(columns));
res.setStartOffset(start_row_offset);
res.setRSResult(rs_result);
return res;
}

@@ -290,7 +294,7 @@ Block DMFileReader::read()

size_t start_pack_id = next_pack_id;
size_t start_row_offset = next_row_offset;
const size_t read_rows = getReadRows();
const auto [read_rows, rs_result] = getReadRows();
if (read_rows == 0)
return {};
addScannedRows(read_rows);
@@ -397,6 +401,7 @@ Block DMFileReader::read()

Block res(std::move(columns));
res.setStartOffset(start_row_offset);
res.setRSResult(rs_result);
return res;
}

@@ -703,4 +708,59 @@ void DMFileReader::addSkippedRows(UInt64 rows)
break;
}
}

void DMFileReader::initAllMatchBlockInfo()
{
const auto & pack_res = pack_filter.getPackResConst();
const auto & pack_stats = dmfile->getPackStats();

// Get continuous packs with RSResult::All
auto get_all_match_block = [&](size_t start_pack) {
size_t count = 0;
size_t rows = 0;
for (size_t i = start_pack; i < pack_res.size(); ++i)
{
if (!pack_res[i].allMatch() || rows >= rows_threshold_per_read)
break;

++count;
rows += pack_stats[i].rows;
}
return std::make_pair(count, rows);
};

for (size_t i = 0; i < pack_res.size();)
{
if (!pack_res[i].allMatch())
{
++i;
continue;
}
auto [pack_count, rows] = get_all_match_block(i);
// Do not read block too small, it may hurts performance
if (rows >= rows_threshold_per_read / 2)
all_match_block_infos.emplace(i, pack_count);
i += pack_count;
}
}

size_t DMFileReader::getReadPackLimit(size_t start_pack_id)
{
if (all_match_block_infos.empty() || read_one_pack_every_time)
return read_one_pack_every_time ? 1 : std::numeric_limits<size_t>::max();

const auto [next_all_match_block_start_pack_id, pack_count] = all_match_block_infos.front();
// Read packs with RSResult::All
if (next_all_match_block_start_pack_id == start_pack_id)
{
all_match_block_infos.pop();
return pack_count;
}
// Read packs until next_all_match_block_start_pack_id
RUNTIME_CHECK(
next_all_match_block_start_pack_id > start_pack_id,
next_all_match_block_start_pack_id,
start_pack_id);
return next_all_match_block_start_pack_id - start_pack_id;
}
} // namespace DB::DM
10 changes: 9 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileReader.h
Original file line number Diff line number Diff line change
@@ -96,7 +96,7 @@ class DMFileReader
friend class tests::DMFileMetaV2Test;

private:
size_t getReadRows();
std::pair<size_t, RSResult> getReadRows();
ColumnPtr readExtraColumn(
const ColumnDefine & cd,
size_t start_pack_id,
@@ -125,6 +125,9 @@ class DMFileReader
void addScannedRows(UInt64 rows);
void addSkippedRows(UInt64 rows);

void initAllMatchBlockInfo();
size_t getReadPackLimit(size_t start_pack_id);

DMFilePtr dmfile;
ColumnDefines read_columns;
ColumnReadStreamMap column_streams;
@@ -169,6 +172,11 @@ class DMFileReader

// DataSharing
std::unique_ptr<ColumnSharingCacheMap> col_data_cache{};

// <start_pack, pack_count>
// Each pair object indicates several continuous packs with RSResult::All and will be read as a Block.
// It is sorted by start_pack.
std::queue<std::pair<size_t, size_t>> all_match_block_infos;
};

} // namespace DB::DM
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/ScanContext.cpp
Original file line number Diff line number Diff line change
@@ -111,6 +111,7 @@ String ScanContext::toJson() const
json->set("rs_pack_filter_some", rs_pack_filter_some.load());
json->set("rs_pack_filter_all", rs_pack_filter_all.load());
json->set("rs_pack_filter_all_null", rs_pack_filter_all_null.load());
json->set("rs_dmfile_read_with_all", rs_dmfile_read_with_all.load());

json->set("num_remote_region", total_remote_region_num.load());
json->set("num_local_region", total_local_region_num.load());
Loading