Skip to content

Commit

Permalink
Storage: minor refine of bitmap and add more tests (#6718)
Browse files Browse the repository at this point in the history
ref #6296
  • Loading branch information
Lloyd-Pottiger authored Feb 24, 2023
1 parent 8d2736a commit d4dc6f4
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 149 deletions.
7 changes: 7 additions & 0 deletions dbms/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,10 @@ add_subdirectory (Encryption)
if (ENABLE_TESTS)
add_subdirectory (TestUtils EXCLUDE_FROM_ALL)
endif ()

check_then_add_sources_compile_flag (
TIFLASH_ENABLE_ARCH_HASWELL_SUPPORT
"${TIFLASH_COMPILER_ARCH_HASWELL_FLAG}"
DeltaMerge/BitmapFilter/BitmapFilter.cpp
DeltaMerge/DMVersionFilterBlockInputStream.cpp
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Columns/ColumnsCommon.h>
#include <Storages/DeltaMerge/BitmapFilter/BitmapFilter.h>
#include <Storages/DeltaMerge/BitmapFilter/BitmapFilterBlockInputStream.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
Expand Down Expand Up @@ -53,9 +54,10 @@ Block BitmapFilterBlockInputStream::readImpl(FilterPtr & res_filter, bool return
}
else
{
size_t passed_count = countBytesInFilter(filter);
for (auto & col : block)
{
col.column = col.column->filter(filter, block.rows());
col.column = col.column->filter(filter, passed_count);
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Storages/DeltaMerge/RowKeyFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <Columns/ColumnConst.h>
#include <Columns/ColumnsCommon.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
Expand Down Expand Up @@ -162,7 +163,7 @@ inline Block filterUnsorted(const RowKeyRanges & rowkey_ranges, Block && block,
}
}
}
size_t passed_count = std::count(filter.cbegin(), filter.cend(), 1);
size_t passed_count = countBytesInFilter(filter);

if (!passed_count)
return {};
Expand All @@ -181,6 +182,11 @@ inline Block filterUnsorted(const RowKeyRanges & rowkey_ranges, Block && block,
}
} // namespace RowKeyFilter

/**
* DMRowKeyFilterBlockInputStream is used to filter block by rowkey ranges.
* Rows whose rowkey is not in the rowkey ranges will be filtered.
* Basically, only the rows in first and the last block of the child stream will be filtered.
*/
template <bool is_block_sorted>
class DMRowKeyFilterBlockInputStream : public IBlockInputStream
{
Expand Down
178 changes: 97 additions & 81 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
#include <fmt/core.h>

#include <ext/scope_guard.h>
#include <memory>
#include <numeric>

namespace ProfileEvents
Expand Down Expand Up @@ -2295,79 +2294,103 @@ BitmapFilterPtr Segment::buildBitmapFilterNormal(const DMContext & dm_context,
return bitmap_filter;
}

struct PackInfo
namespace
{

struct Range
{
UInt64 pack_id;
UInt64 offset;
UInt64 rows;

Range(UInt64 offset_, UInt64 rows_)
: offset(offset_)
, rows(rows_)
{}
};
std::pair<std::vector<PackInfo>, IdSetPtr> parseDMFilePackInfo(const DMFilePtr & dmfile,
const DMContext & dm_context,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 max_version)
{
DMFilePackFilter pack_filter = DMFilePackFilter::loadFrom(
dmfile,
dm_context.db_context.getMinMaxIndexCache(),
/*set_cache_if_miss*/ true,
read_ranges,
filter,
/*read_pack*/ {},
dm_context.db_context.getFileProvider(),
dm_context.db_context.getReadLimiter(),
dm_context.scan_context,
dm_context.tracing_id);
const auto & use_packs = pack_filter.getUsePacks();
const auto & handle_res = pack_filter.getHandleRes();
const auto & pack_stats = dmfile->getPackStats();

std::pair<std::vector<Range>, std::vector<IdSetPtr>> parseDMFilePackInfo(const DMFiles & dmfiles,
const DMContext & dm_context,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 max_version)
{
// Packs that all rows compliant with MVCC filter and RowKey filter requirements.
// For building bitmap filter, we don't need to read these packs,
// just set corresponding positions in the bitmap to true.
std::vector<PackInfo> all_packs;
// So we record the offset and rows of these packs and merge continuous ranges.
std::vector<Range> skipped_ranges;
// Packs that some rows compliant with MVCC filter and RowKey filter requirements.
// We need to read these packs and do RowKey filter and MVCC filter for them.
auto some_packs = std::make_shared<IdSet>();
UInt64 rows = 0;
for (size_t pack_id = 0; pack_id < pack_stats.size(); pack_id++)
{
const auto & pack_stat = pack_stats[pack_id];
rows += pack_stat.rows;
if (!use_packs[pack_id])
{
continue;
}
std::vector<IdSetPtr> some_packs_sets;
some_packs_sets.reserve(dmfiles.size());

// The offset of the first row in the current range.
size_t offset = 0;
// The number of rows in the current range.
size_t rows = 0;
UInt32 preceded_rows = 0;

for (const auto & dmfile : dmfiles)
{
DMFilePackFilter pack_filter = DMFilePackFilter::loadFrom(
dmfile,
dm_context.db_context.getMinMaxIndexCache(),
/*set_cache_if_miss*/ true,
read_ranges,
filter,
/*read_pack*/ {},
dm_context.db_context.getFileProvider(),
dm_context.db_context.getReadLimiter(),
dm_context.scan_context,
dm_context.tracing_id);
const auto & use_packs = pack_filter.getUsePacks();
const auto & handle_res = pack_filter.getHandleRes();
const auto & pack_stats = dmfile->getPackStats();

// assert(handle_res[pack_id] == RSResult::Some || handle_res[pack_id] == RSResult::All);
auto some_packs_set = std::make_shared<IdSet>();

if (handle_res[pack_id] == RSResult::Some)
for (size_t pack_id = 0; pack_id < pack_stats.size(); ++pack_id)
{
// We need to read this pack to do RowKey filter.
some_packs->insert(pack_id);
continue;
}
const auto & pack_stat = pack_stats[pack_id];
preceded_rows += pack_stat.rows;
if (!use_packs[pack_id])
{
continue;
}

// assert(handle_res[pack_id] == RSResult::All);
if (handle_res[pack_id] == RSResult::Some
|| pack_stat.not_clean > 0
|| pack_filter.getMaxVersion(pack_id) > max_version)
{
// We need to read this pack to do RowKey or MVCC filter.
some_packs_set->insert(pack_id);
continue;
}

if (pack_stat.not_clean > 0)
{
// We need to read this pack to do MVCC filter.
some_packs->insert(pack_id);
continue;
// When this pack is next to the previous pack, we merge them.
// Otherwise, we record the previous continuous packs and start a new one.
if (offset + rows == preceded_rows - pack_stat.rows)
{
rows += pack_stat.rows;
}
else
{
skipped_ranges.emplace_back(offset, rows);
offset = preceded_rows - pack_stat.rows;
rows = pack_stat.rows;
}
}

if (pack_filter.getMaxVersion(pack_id) > max_version)
{
// We need to read this pack to do MVCC filter.
some_packs->insert(pack_id);
continue;
}
all_packs.push_back({pack_id, rows - pack_stat.rows, pack_stat.rows});
some_packs_sets.push_back(some_packs_set);
}
return {all_packs, some_packs};
if (rows > 0)
skipped_ranges.emplace_back(offset, rows);

return {skipped_ranges, some_packs_sets};
}

} // namespace

BitmapFilterPtr Segment::buildBitmapFilterStableOnly(const DMContext & dm_context,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & read_ranges,
Expand All @@ -2377,44 +2400,37 @@ BitmapFilterPtr Segment::buildBitmapFilterStableOnly(const DMContext & dm_contex
{
Stopwatch sw;
const auto & dmfiles = segment_snap->stable->getDMFiles();
RUNTIME_CHECK(!dmfiles.empty(), dmfiles.size());
std::vector<std::vector<PackInfo>> all_packs{dmfiles.size()};
std::vector<IdSetPtr> some_packs{dmfiles.size()};
bool all_match = true;
size_t some_packs_count = 0;
for (size_t i = 0; i < dmfiles.size(); i++)
{
std::tie(all_packs[i], some_packs[i]) = parseDMFilePackInfo(dmfiles[i],
dm_context,
read_ranges,
filter,
max_version);
all_match = all_match && (all_packs[i].size() == dmfiles[i]->getPacks());
some_packs_count += some_packs[i]->size();
}
RUNTIME_CHECK(!dmfiles.empty());

if (all_match)
auto [skipped_ranges, some_packs_sets] = parseDMFilePackInfo(dmfiles, dm_context, read_ranges, filter, max_version);

if (skipped_ranges.size() == 1 && skipped_ranges[0].offset == 0 && skipped_ranges[0].rows == segment_snap->stable->getDMFilesRows())
{
LOG_DEBUG(log, "all match, total_rows={}, cost={}ms", segment_snap->stable->getDMFilesRows(), sw.elapsedMilliseconds());
return std::make_shared<BitmapFilter>(segment_snap->stable->getDMFilesRows(), /*default_value*/ true);
}

auto bitmap_filter = std::make_shared<BitmapFilter>(segment_snap->stable->getDMFilesRows(), /*default_value*/ false);
UInt32 preceded_dmfile_rows = 0;
for (size_t i = 0; i < dmfiles.size(); i++)
for (const auto & range : skipped_ranges)
{
bitmap_filter->set(range.offset, range.rows);
}

bool has_some_packs = false;
for (const auto & some_packs_set : some_packs_sets)
{
const auto & all_pack = all_packs[i];
for (const auto & pack : all_pack)
if (!some_packs_set->empty())
{
bitmap_filter->set(pack.offset + preceded_dmfile_rows, pack.rows);
has_some_packs = true;
break;
}
preceded_dmfile_rows += dmfiles[i]->getRows();
}
if (some_packs_count <= 0)
if (!has_some_packs)
{
return bitmap_filter;
}
ColumnDefines columns_to_read{

const ColumnDefines columns_to_read{
getExtraHandleColumnDefine(is_common_handle),
getVersionColumnDefine(),
getTagColumnDefine(),
Expand All @@ -2428,10 +2444,10 @@ BitmapFilterPtr Segment::buildBitmapFilterStableOnly(const DMContext & dm_contex
/*enable_handle_clean_read*/ false,
/*is_fast_scan*/ false,
/*enable_del_clean_read*/ false,
/*read_packs*/ some_packs,
/*read_packs*/ some_packs_sets,
/*need_row_id*/ true);
stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(stream, read_ranges, 0);
ColumnDefines read_columns{
const ColumnDefines read_columns{
getExtraHandleColumnDefine(is_common_handle),
};
stream = std::make_shared<DMVersionFilterBlockInputStream<DM_VERSION_FILTER_MODE_MVCC>>(
Expand All @@ -2441,7 +2457,7 @@ BitmapFilterPtr Segment::buildBitmapFilterStableOnly(const DMContext & dm_contex
is_common_handle,
dm_context.tracing_id);
bitmap_filter->set(stream);
LOG_DEBUG(log, "some_packs={}, total_rows={}, cost={}ms", some_packs_count, segment_snap->stable->getDMFilesRows(), sw.elapsedMilliseconds());
LOG_DEBUG(log, "total_rows={}, cost={}ms", segment_snap->stable->getDMFilesRows(), sw.elapsedMilliseconds());
return bitmap_filter;
}

Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Storages/DeltaMerge/SkippableBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include <Columns/ColumnsNumber.h>
#include <Core/Block.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>

namespace DB
{
Expand All @@ -25,7 +27,7 @@ namespace DM
class SkippableBlockInputStream : public IBlockInputStream
{
public:
virtual ~SkippableBlockInputStream() = default;
~SkippableBlockInputStream() override = default;

/// Return false if it is the end of stream.
virtual bool getSkippedRows(size_t & skip_rows) = 0;
Expand All @@ -37,7 +39,7 @@ using SkippableBlockInputStreams = std::vector<SkippableBlockInputStreamPtr>;
class EmptySkippableBlockInputStream : public SkippableBlockInputStream
{
public:
EmptySkippableBlockInputStream(const ColumnDefines & read_columns_)
explicit EmptySkippableBlockInputStream(const ColumnDefines & read_columns_)
: read_columns(read_columns_)
{}

Expand All @@ -50,7 +52,7 @@ class EmptySkippableBlockInputStream : public SkippableBlockInputStream
Block read() override { return {}; }

private:
ColumnDefines read_columns;
ColumnDefines read_columns{};
};

template <bool need_row_id = false>
Expand Down Expand Up @@ -82,7 +84,7 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
skip_rows = 0;
while (current_stream != children.end())
{
auto skippable_stream = dynamic_cast<SkippableBlockInputStream *>((*current_stream).get());
auto * skippable_stream = dynamic_cast<SkippableBlockInputStream *>((*current_stream).get());

size_t skip;
bool has_next_block = skippable_stream->getSkippedRows(skip);
Expand Down
Loading

0 comments on commit d4dc6f4

Please sign in to comment.